You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/05 07:53:58 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10974: KAFKA-12979; Implement command to find hanging transactions

hachikuji opened a new pull request #10974:
URL: https://github.com/apache/kafka/pull/10974


   This patch implements the `find-hanging` command described in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions#KIP664:Providetoolingtodetectandaborthangingtransactions-FindingHangingTransactions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10974:
URL: https://github.com/apache/kafka/pull/10974


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664156720



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            if (description.producerEpoch() > openTransaction.producerState.producerEpoch()
+                                || !description.topicPartitions().contains(openTransaction.topicPartition)) {

Review comment:
       Thanks, added a comment. I decided to remove the epoch check. The producer epoch could be bumped as a result of a transaction timeout, so the check did not seem quite right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664154672



##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);

Review comment:
       Yeah, I was trying to deal with the fact that `TransactionsCommand` invokes both variants of `listTransactions`. I dealt with it by modifying the no-arg call to provide the options explicitly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664714525



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");

Review comment:
       Added a test case for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r663770706



##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);

Review comment:
       nit: We could instantiate a new `ListTransactionsOptions` here instead of using `null`. This would remove the `null` check below.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            if (description.producerEpoch() > openTransaction.producerState.producerEpoch()
+                                || !description.topicPartitions().contains(openTransaction.topicPartition)) {

Review comment:
       Should we also add a small comment here to be complete? Otherwise, we could add a javadoc which explains all the conditions.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state

Review comment:
       nit: Add `.` at the end of the sentence.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            if (description.producerEpoch() > openTransaction.producerState.producerEpoch()
+                                || !description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                return admin.describeTransactions(new HashSet<>(transactionalIds)).all().get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }
+
+        private static class OpenTransaction {
+            private final TopicPartition topicPartition;
+            private final ProducerState producerState;
+
+            private OpenTransaction(
+                TopicPartition topicPartition,
+                ProducerState producerState
+            ) {
+                this.topicPartition = topicPartition;
+                this.producerState = producerState;
+            }
+        }
+
+        private void collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions,
+            List<OpenTransaction> candidateTransactions
+        ) throws Exception {
+            try {
+                DescribeProducersOptions describeOptions = new DescribeProducersOptions();
+                brokerId.ifPresent(describeOptions::brokerId);
+
+                Map<TopicPartition, DescribeProducersResult.PartitionProducerState> producersByPartition =
+                    admin.describeProducers(topicPartitions, describeOptions).all().get();
+
+                long currentTimeMs = time.milliseconds();
+
+                producersByPartition.forEach((topicPartition, producersStates) -> {
+                    producersStates.activeProducers().forEach(activeProducer -> {
+                        if (activeProducer.currentTransactionStartOffset().isPresent()) {
+                            long transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp();
+                            if (transactionDurationMs > maxTransactionTimeoutMs) {
+                                candidateTransactions.add(new OpenTransaction(
+                                    topicPartition,
+                                    activeProducer
+                                ));
+                            }
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe producers for " + topicPartitions.size() +
+                    " partitions on broker " + brokerId, e.getCause());
+            }
+        }
+
+        private Map<Long, String> lookupTransactionalIds(
+            Admin admin,
+            Set<Long> producerIds
+        ) throws Exception {
+            try {
+                ListTransactionsOptions listTransactionsOptions = new ListTransactionsOptions()
+                    .filterProducerIds(producerIds);
+
+                Collection<TransactionListing> transactionListings =
+                    admin.listTransactions(listTransactionsOptions).all().get();
+
+                Map<Long, String> transactionalIdMap = new HashMap<>();
+
+                transactionListings.forEach(listing -> {
+                    if (!producerIds.contains(listing.producerId())) {
+                        log.debug("Received transaction listing {} which has a producerId " +
+                            "which was not requested", listing);
+                    } else {
+                        transactionalIdMap.put(
+                            listing.producerId(),
+                            listing.transactionalId()
+                        );
+                    }
+                });
+
+                return transactionalIdMap;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions for " + producerIds.size() +
+                    " producers", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+
+        private <T> void consumeInBatches(
+            List<T> list,
+            int batchSize,
+            ThrowableConsumer<List<T>> consumer
+        ) throws Exception {
+            int batchStartIndex = 0;
+            int limitIndex = list.size();
+
+            while (batchStartIndex < limitIndex) {
+                int batchEndIndex = Math.min(
+                    limitIndex,
+                    batchStartIndex + batchSize
+                );
+
+                List<T> batch = list.subList(batchStartIndex, batchEndIndex);
+                consumer.accept(batch);

Review comment:
       nit: As we don't reuse `batch`, we could directly pass `list.subList(batchStartIndex, batchEndIndex)` to `accept`.

##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+
+        if (options == null) {
+            Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        } else {
+            Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+        }
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {

Review comment:
       Should we also add a test with no mapped `TransactionDescription`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#issuecomment-874954633


   I will go ahead and merge since the 3.0 branch is about to be cut. I tested locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664159092



##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+
+        if (options == null) {
+            Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        } else {
+            Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+        }
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {

Review comment:
       Yeah. In fact, the `desribeTransactions` API would return `TRANSACTIONAL_ID_NOT_FOUND`, so I needed to do a little refactoring to handle this properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664276058



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")

Review comment:
       nit: "topic name to limit search to. REQUIRED if --partition is specified."

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")

Review comment:
       Could we also mention that default is 15 mins if not set?

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");

Review comment:
       Could we handle the case that both `topic` and `brokerId` are provided? Will it turn out that we tried to find the specific topic but not in the brokerId?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664276058



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")

Review comment:
       nit: "topic name to limit search to. REQUIRED if --partition is specified."

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")

Review comment:
       Could we also mention that default is 15 mins if not set?

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");

Review comment:
       Could we handle the case that both `topic` and `brokerId` are provided? Will it turn out that we tried to find the specific topic but not in the brokerId?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r663770706



##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);

Review comment:
       nit: We could instantiate a new `ListTransactionsOptions` here instead of using `null`. This would remove the `null` check below.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            if (description.producerEpoch() > openTransaction.producerState.producerEpoch()
+                                || !description.topicPartitions().contains(openTransaction.topicPartition)) {

Review comment:
       Should we also add a small comment here to be complete? Otherwise, we could add a javadoc which explains all the conditions.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state

Review comment:
       nit: Add `.` at the end of the sentence.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            if (description.producerEpoch() > openTransaction.producerState.producerEpoch()
+                                || !description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                return admin.describeTransactions(new HashSet<>(transactionalIds)).all().get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }
+
+        private static class OpenTransaction {
+            private final TopicPartition topicPartition;
+            private final ProducerState producerState;
+
+            private OpenTransaction(
+                TopicPartition topicPartition,
+                ProducerState producerState
+            ) {
+                this.topicPartition = topicPartition;
+                this.producerState = producerState;
+            }
+        }
+
+        private void collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions,
+            List<OpenTransaction> candidateTransactions
+        ) throws Exception {
+            try {
+                DescribeProducersOptions describeOptions = new DescribeProducersOptions();
+                brokerId.ifPresent(describeOptions::brokerId);
+
+                Map<TopicPartition, DescribeProducersResult.PartitionProducerState> producersByPartition =
+                    admin.describeProducers(topicPartitions, describeOptions).all().get();
+
+                long currentTimeMs = time.milliseconds();
+
+                producersByPartition.forEach((topicPartition, producersStates) -> {
+                    producersStates.activeProducers().forEach(activeProducer -> {
+                        if (activeProducer.currentTransactionStartOffset().isPresent()) {
+                            long transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp();
+                            if (transactionDurationMs > maxTransactionTimeoutMs) {
+                                candidateTransactions.add(new OpenTransaction(
+                                    topicPartition,
+                                    activeProducer
+                                ));
+                            }
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe producers for " + topicPartitions.size() +
+                    " partitions on broker " + brokerId, e.getCause());
+            }
+        }
+
+        private Map<Long, String> lookupTransactionalIds(
+            Admin admin,
+            Set<Long> producerIds
+        ) throws Exception {
+            try {
+                ListTransactionsOptions listTransactionsOptions = new ListTransactionsOptions()
+                    .filterProducerIds(producerIds);
+
+                Collection<TransactionListing> transactionListings =
+                    admin.listTransactions(listTransactionsOptions).all().get();
+
+                Map<Long, String> transactionalIdMap = new HashMap<>();
+
+                transactionListings.forEach(listing -> {
+                    if (!producerIds.contains(listing.producerId())) {
+                        log.debug("Received transaction listing {} which has a producerId " +
+                            "which was not requested", listing);
+                    } else {
+                        transactionalIdMap.put(
+                            listing.producerId(),
+                            listing.transactionalId()
+                        );
+                    }
+                });
+
+                return transactionalIdMap;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions for " + producerIds.size() +
+                    " producers", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+
+        private <T> void consumeInBatches(
+            List<T> list,
+            int batchSize,
+            ThrowableConsumer<List<T>> consumer
+        ) throws Exception {
+            int batchStartIndex = 0;
+            int limitIndex = list.size();
+
+            while (batchStartIndex < limitIndex) {
+                int batchEndIndex = Math.min(
+                    limitIndex,
+                    batchStartIndex + batchSize
+                );
+
+                List<T> batch = list.subList(batchStartIndex, batchEndIndex);
+                consumer.accept(batch);

Review comment:
       nit: As we don't reuse `batch`, we could directly pass `list.subList(batchStartIndex, batchEndIndex)` to `accept`.

##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+
+        if (options == null) {
+            Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        } else {
+            Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+        }
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {

Review comment:
       Should we also add a test with no mapped `TransactionDescription`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664154672



##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);

Review comment:
       Yeah, I was trying to deal with the fact that `TransactionsCommand` invokes both variants of `listTransactions`. I dealt with it by modifying the no-arg call to provide the options explicitly.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +469,417 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            if (description.producerEpoch() > openTransaction.producerState.producerEpoch()
+                                || !description.topicPartitions().contains(openTransaction.topicPartition)) {

Review comment:
       Thanks, added a comment. I decided to remove the epoch check. The producer epoch could be bumped as a result of a transaction timeout, so the check did not seem quite right.

##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +442,536 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(null, listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+
+        if (options == null) {
+            Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        } else {
+            Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+        }
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {

Review comment:
       Yeah. In fact, the `desribeTransactions` API would return `TRANSACTIONAL_ID_NOT_FOUND`, so I needed to do a little refactoring to handle this properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10974: KAFKA-12979; Implement command to find hanging transactions

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10974:
URL: https://github.com/apache/kafka/pull/10974#discussion_r664469428



##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state.
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            // The `DescribeTransactions` API returns all partitions being
+                            // written to in an ongoing transaction and any partition which
+                            // does not yet have markers written when in the `PendingAbort` or
+                            // `PendingCommit` states. If the topic partition that we found is
+                            // among these, then we can still expect the coordinator to write
+                            // the marker. Otherwise, it is a hanging transaction.
+                            if (!description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                DescribeTransactionsResult result = admin.describeTransactions(new HashSet<>(transactionalIds));
+                Map<String, TransactionDescription> descriptions = new HashMap<>();
+
+                for (String transactionalId : transactionalIds) {
+                    try {
+                        TransactionDescription description = result.description(transactionalId).get();
+                        descriptions.put(transactionalId, description);
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof TransactionalIdNotFoundException) {
+                            descriptions.put(transactionalId, null);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                return descriptions;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }

Review comment:
       nit: I would move this one next to `consumeInBatches` as they are used together.

##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +444,532 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(new ListTransactionsOptions(), listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+        Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
+        descriptions.forEach((transactionalId, description) -> {
+            Mockito.when(result.description(transactionalId))
+                .thenReturn(completedFuture(description));
+        });
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
+        String topic = "foo";
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topic
+        };
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingSpecifiedTopicPartition() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds();
+        OptionalInt coordinatorEpoch = OptionalInt.of(19);
+        OptionalLong txnStartOffset = OptionalLong.of(29384L);
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            coordinatorEpoch,
+            txnStartOffset
+        );
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+    }
+
+    @Test
+    public void testFindHangingNoMappedTransactionalId() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        long producerId = 132L;
+        short producerEpoch = 5;
+        long lastTimestamp = time.milliseconds() - TimeUnit.MINUTES.toMillis(60);
+        int coordinatorEpoch = 19;
+        long txnStartOffset = 29384L;
+
+        expectDescribeProducers(
+            topicPartition,
+            producerId,
+            producerEpoch,
+            lastTimestamp,
+            OptionalInt.of(coordinatorEpoch),
+            OptionalLong.of(txnStartOffset)
+        );
+
+        expectListTransactions(
+            new ListTransactionsOptions().filterProducerIds(singleton(producerId)),
+            singletonMap(1, Collections.emptyList())
+        );
+
+        expectDescribeTransactions(Collections.emptyMap());
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(2, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+
+        List<String> expectedRow = asList(
+            topicPartition.topic(),
+            String.valueOf(topicPartition.partition()),
+            String.valueOf(producerId),
+            String.valueOf(producerEpoch),
+            String.valueOf(coordinatorEpoch),
+            String.valueOf(txnStartOffset),
+            String.valueOf(lastTimestamp),
+            "60"
+        );
+        assertEquals(expectedRow, table.get(1));

Review comment:
       nit: We could also define a helper method for this one to avoid the code repetition.

##########
File path: tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
##########
@@ -437,6 +444,532 @@ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordin
         assertNormalExit();
     }
 
+    @Test
+    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging"
+        });
+    }
+
+    @Test
+    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            "0",
+            "--partition",
+            "5"
+        });
+    }
+
+    private void expectListTransactions(
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        expectListTransactions(new ListTransactionsOptions(), listingsByBroker);
+    }
+
+    private void expectListTransactions(
+        ListTransactionsOptions options,
+        Map<Integer, Collection<TransactionListing>> listingsByBroker
+    ) {
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+        Mockito.when(admin.listTransactions(options)).thenReturn(listResult);
+
+        List<TransactionListing> allListings = new ArrayList<>();
+        listingsByBroker.values().forEach(allListings::addAll);
+
+        Mockito.when(listResult.all()).thenReturn(completedFuture(allListings));
+        Mockito.when(listResult.allByBrokerId()).thenReturn(completedFuture(listingsByBroker));
+    }
+
+    private void expectDescribeProducers(
+        TopicPartition topicPartition,
+        long producerId,
+        short producerEpoch,
+        long lastTimestamp,
+        OptionalInt coordinatorEpoch,
+        OptionalLong txnStartOffset
+    ) {
+        PartitionProducerState partitionProducerState = new PartitionProducerState(singletonList(
+            new ProducerState(
+                producerId,
+                producerEpoch,
+                500,
+                lastTimestamp,
+                coordinatorEpoch,
+                txnStartOffset
+            )
+        ));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(
+            completedFuture(singletonMap(topicPartition, partitionProducerState))
+        );
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(topicPartition),
+            new DescribeProducersOptions()
+        )).thenReturn(result);
+    }
+
+    private void expectDescribeTransactions(
+        Map<String, TransactionDescription> descriptions
+    ) {
+        DescribeTransactionsResult result = Mockito.mock(DescribeTransactionsResult.class);
+        descriptions.forEach((transactionalId, description) -> {
+            Mockito.when(result.description(transactionalId))
+                .thenReturn(completedFuture(description));
+        });
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTransactions(descriptions.keySet())).thenReturn(result);
+    }
+
+    private void expectListTopics(
+        Set<String> topics
+    ) {
+        ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
+        Mockito.when(result.names()).thenReturn(completedFuture(topics));
+        Mockito.when(admin.listTopics()).thenReturn(result);
+    }
+
+    private void expectDescribeTopics(
+        Map<String, TopicDescription> descriptions
+    ) {
+        DescribeTopicsResult result = Mockito.mock(DescribeTopicsResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(descriptions));
+        Mockito.when(admin.describeTopics(new ArrayList<>(descriptions.keySet()))).thenReturn(result);
+    }
+
+    @Test
+    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
+        int brokerId = 5;
+
+        String[] args = new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "find-hanging",
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        String topic = "foo";
+        expectListTopics(singleton(topic));
+
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+        Node node5 = new Node(5, "localhost", 9097);
+
+        TopicPartitionInfo partition0 = new TopicPartitionInfo(
+            0,
+            node0,
+            Arrays.asList(node0, node1),
+            Arrays.asList(node0, node1)
+        );
+        TopicPartitionInfo partition1 = new TopicPartitionInfo(
+            1,
+            node1,
+            Arrays.asList(node1, node5),
+            Arrays.asList(node1, node5)
+        );
+
+        TopicDescription description = new TopicDescription(
+            topic,
+            false,
+            Arrays.asList(partition0, partition1)
+        );
+        expectDescribeTopics(singletonMap(topic, description));
+
+        DescribeProducersResult result = Mockito.mock(DescribeProducersResult.class);
+        Mockito.when(result.all()).thenReturn(completedFuture(emptyMap()));
+
+        Mockito.when(admin.describeProducers(
+            Collections.singletonList(new TopicPartition(topic, 1)),
+            new DescribeProducersOptions().brokerId(brokerId)
+        )).thenReturn(result);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(1, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.FindHangingTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));

Review comment:
       nit: This block is repeated in many tests. I wonder if we could define an helper for it.

##########
File path: tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
##########
@@ -461,6 +471,437 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception
         }
     }
 
+    static class FindHangingTransactionsCommand extends TransactionsCommand {
+        private static final int MAX_BATCH_SIZE = 500;
+
+        static final String[] HEADERS = new String[] {
+            "Topic",
+            "Partition",
+            "ProducerId",
+            "ProducerEpoch",
+            "CoordinatorEpoch",
+            "StartOffset",
+            "LastTimestamp",
+            "Duration(min)"
+        };
+
+        FindHangingTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "find-hanging";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("find hanging transactions");
+
+            subparser.addArgument("--broker-id")
+                .help("broker id to search for hanging transactions")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--max-transaction-timeout")
+                .help("maximum transaction timeout in minutes to limit the scope of the search")
+                .action(store())
+                .type(Integer.class)
+                .setDefault(15)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name to limit search to")
+                .action(store())
+                .type(String.class)
+                .required(false);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
+            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
+
+            if (!topic.isPresent() && !brokerId.isPresent()) {
+                printErrorAndExit("The `find-hanging` command requires either --topic " +
+                    "or --broker-id to limit the scope of the search");
+                return;
+            }
+
+            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
+            if (partition.isPresent() && !topic.isPresent()) {
+                printErrorAndExit("The --partition argument requires --topic to be provided");
+                return;
+            }
+
+            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(
+                ns.getInt("max_transaction_timeout"));
+
+            List<TopicPartition> topicPartitions = collectTopicPartitionsToSearch(
+                admin,
+                topic,
+                partition,
+                brokerId
+            );
+
+            List<OpenTransaction> candidates = collectCandidateOpenTransactions(
+                admin,
+                brokerId,
+                maxTransactionTimeoutMs,
+                topicPartitions
+            );
+
+            if (candidates.isEmpty()) {
+                printHangingTransactions(Collections.emptyList(), out);
+            } else {
+                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = groupByProducerId(candidates);
+
+                Map<Long, String> transactionalIds = lookupTransactionalIds(
+                    admin,
+                    openTransactionsByProducerId.keySet()
+                );
+
+                Map<String, TransactionDescription> descriptions = describeTransactions(
+                    admin,
+                    transactionalIds.values()
+                );
+
+                List<OpenTransaction> hangingTransactions = filterHangingTransactions(
+                    openTransactionsByProducerId,
+                    transactionalIds,
+                    descriptions
+                );
+
+                printHangingTransactions(hangingTransactions, out);
+            }
+        }
+
+        private List<TopicPartition> collectTopicPartitionsToSearch(
+            Admin admin,
+            Optional<String> topic,
+            Optional<Integer> partition,
+            Optional<Integer> brokerId
+        ) throws Exception {
+            final List<String> topics;
+
+            if (topic.isPresent()) {
+                if (partition.isPresent()) {
+                    return Collections.singletonList(new TopicPartition(topic.get(), partition.get()));
+                } else {
+                    topics = Collections.singletonList(topic.get());
+                }
+            } else {
+                topics = listTopics(admin);
+            }
+
+            return findTopicPartitions(
+                admin,
+                brokerId,
+                topics
+            );
+        }
+
+        private List<OpenTransaction> filterHangingTransactions(
+            Map<Long, List<OpenTransaction>> openTransactionsByProducerId,
+            Map<Long, String> transactionalIds,
+            Map<String, TransactionDescription> descriptions
+        ) {
+            List<OpenTransaction> hangingTransactions = new ArrayList<>();
+
+            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
+                String transactionalId = transactionalIds.get(producerId);
+                if (transactionalId == null) {
+                    // If we could not find the transactionalId corresponding to the
+                    // producerId of an open transaction, then the transaction is hanging.
+                    hangingTransactions.addAll(openTransactions);
+                } else {
+                    // Otherwise, we need to check the current transaction state.
+                    TransactionDescription description = descriptions.get(transactionalId);
+                    if (description == null) {
+                        hangingTransactions.addAll(openTransactions);
+                    } else {
+                        for (OpenTransaction openTransaction : openTransactions) {
+                            // The `DescribeTransactions` API returns all partitions being
+                            // written to in an ongoing transaction and any partition which
+                            // does not yet have markers written when in the `PendingAbort` or
+                            // `PendingCommit` states. If the topic partition that we found is
+                            // among these, then we can still expect the coordinator to write
+                            // the marker. Otherwise, it is a hanging transaction.
+                            if (!description.topicPartitions().contains(openTransaction.topicPartition)) {
+                                hangingTransactions.add(openTransaction);
+                            }
+                        }
+                    }
+                }
+            });
+
+            return hangingTransactions;
+        }
+
+        private void printHangingTransactions(
+            List<OpenTransaction> hangingTransactions,
+            PrintStream out
+        ) {
+            long currentTimeMs = time.milliseconds();
+            List<String[]> rows = new ArrayList<>(hangingTransactions.size());
+
+            for (OpenTransaction transaction : hangingTransactions) {
+                long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(
+                    currentTimeMs - transaction.producerState.lastTimestamp());
+
+                rows.add(new String[] {
+                    transaction.topicPartition.topic(),
+                    String.valueOf(transaction.topicPartition.partition()),
+                    String.valueOf(transaction.producerState.producerId()),
+                    String.valueOf(transaction.producerState.producerEpoch()),
+                    String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)),
+                    String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1)),
+                    String.valueOf(transaction.producerState.lastTimestamp()),
+                    String.valueOf(transactionDurationMinutes)
+                });
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+
+        private Map<String, TransactionDescription> describeTransactions(
+            Admin admin,
+            Collection<String> transactionalIds
+        ) throws Exception {
+            try {
+                DescribeTransactionsResult result = admin.describeTransactions(new HashSet<>(transactionalIds));
+                Map<String, TransactionDescription> descriptions = new HashMap<>();
+
+                for (String transactionalId : transactionalIds) {
+                    try {
+                        TransactionDescription description = result.description(transactionalId).get();
+                        descriptions.put(transactionalId, description);
+                    } catch (ExecutionException e) {
+                        if (e.getCause() instanceof TransactionalIdNotFoundException) {
+                            descriptions.put(transactionalId, null);
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+
+                return descriptions;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + transactionalIds.size()
+                    + " transactions", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+
+        private Map<Long, List<OpenTransaction>> groupByProducerId(
+            List<OpenTransaction> openTransactions
+        ) {
+            Map<Long, List<OpenTransaction>> res = new HashMap<>();
+            for (OpenTransaction transaction : openTransactions) {
+                List<OpenTransaction> states = res.computeIfAbsent(
+                    transaction.producerState.producerId(),
+                    __ -> new ArrayList<>()
+                );
+                states.add(transaction);
+            }
+            return res;
+        }
+
+        private List<String> listTopics(
+            Admin admin
+        ) throws Exception {
+            try {
+                return new ArrayList<>(admin.listTopics().names().get());
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list topics", e.getCause());
+                return Collections.emptyList();
+            }
+        }
+
+        private List<TopicPartition> findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics
+        ) throws Exception {
+            List<TopicPartition> topicPartitions = new ArrayList<>();
+            consumeInBatches(topics, MAX_BATCH_SIZE, batch -> {
+                findTopicPartitions(
+                    admin,
+                    brokerId,
+                    batch,
+                    topicPartitions
+                );
+            });
+            return topicPartitions;
+        }
+
+        private void findTopicPartitions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            List<String> topics,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            try {
+                Map<String, TopicDescription> topicDescriptions = admin.describeTopics(topics).all().get();
+                topicDescriptions.forEach((topic, description) -> {
+                    description.partitions().forEach(partitionInfo -> {
+                        if (!brokerId.isPresent() || hasReplica(brokerId.get(), partitionInfo)) {
+                            topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
+            }
+        }
+
+        private boolean hasReplica(
+            int brokerId,
+            TopicPartitionInfo partitionInfo
+        ) {
+            return partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId);
+        }
+
+        private List<OpenTransaction> collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions
+        ) throws Exception {
+            // We have to check all partitions on the broker. In order to avoid
+            // overwhelming it with a giant request, we break the requests into
+            // smaller batches.
+
+            List<OpenTransaction> candidateTransactions = new ArrayList<>();
+
+            consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> {
+                collectCandidateOpenTransactions(
+                    admin,
+                    brokerId,
+                    maxTransactionTimeoutMs,
+                    batch,
+                    candidateTransactions
+                );
+            });
+
+            return candidateTransactions;
+        }
+
+        @FunctionalInterface
+        private interface ThrowableConsumer<T> {
+            void accept(T t) throws Exception;
+        }
+
+        private static class OpenTransaction {
+            private final TopicPartition topicPartition;
+            private final ProducerState producerState;
+
+            private OpenTransaction(
+                TopicPartition topicPartition,
+                ProducerState producerState
+            ) {
+                this.topicPartition = topicPartition;
+                this.producerState = producerState;
+            }
+        }
+
+        private void collectCandidateOpenTransactions(
+            Admin admin,
+            Optional<Integer> brokerId,
+            long maxTransactionTimeoutMs,
+            List<TopicPartition> topicPartitions,
+            List<OpenTransaction> candidateTransactions
+        ) throws Exception {
+            try {
+                DescribeProducersOptions describeOptions = new DescribeProducersOptions();
+                brokerId.ifPresent(describeOptions::brokerId);
+
+                Map<TopicPartition, DescribeProducersResult.PartitionProducerState> producersByPartition =
+                    admin.describeProducers(topicPartitions, describeOptions).all().get();
+
+                long currentTimeMs = time.milliseconds();
+
+                producersByPartition.forEach((topicPartition, producersStates) -> {
+                    producersStates.activeProducers().forEach(activeProducer -> {
+                        if (activeProducer.currentTransactionStartOffset().isPresent()) {
+                            long transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp();
+                            if (transactionDurationMs > maxTransactionTimeoutMs) {
+                                candidateTransactions.add(new OpenTransaction(
+                                    topicPartition,
+                                    activeProducer
+                                ));
+                            }
+                        }
+                    });
+                });
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe producers for " + topicPartitions.size() +
+                    " partitions on broker " + brokerId, e.getCause());
+            }
+        }
+
+        private Map<Long, String> lookupTransactionalIds(
+            Admin admin,
+            Set<Long> producerIds
+        ) throws Exception {
+            try {
+                ListTransactionsOptions listTransactionsOptions = new ListTransactionsOptions()
+                    .filterProducerIds(producerIds);
+
+                Collection<TransactionListing> transactionListings =
+                    admin.listTransactions(listTransactionsOptions).all().get();
+
+                Map<Long, String> transactionalIdMap = new HashMap<>();
+
+                transactionListings.forEach(listing -> {
+                    if (!producerIds.contains(listing.producerId())) {
+                        log.debug("Received transaction listing {} which has a producerId " +
+                            "which was not requested", listing);
+                    } else {
+                        transactionalIdMap.put(
+                            listing.producerId(),
+                            listing.transactionalId()
+                        );
+                    }
+                });
+
+                return transactionalIdMap;
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions for " + producerIds.size() +
+                    " producers", e.getCause());
+                return Collections.emptyMap();
+            }
+        }
+

Review comment:
       nit: This empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org