You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/22 16:48:50 UTC

[kafka] branch trunk updated: KAFKA-12888; Add transaction tool from KIP-664 (#10814)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fce7715  KAFKA-12888; Add transaction tool from KIP-664 (#10814)
fce7715 is described below

commit fce771579c3e20f20949c4c7e0a5e3a16c57c7f0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jun 22 09:47:30 2021 -0700

    KAFKA-12888; Add transaction tool from KIP-664 (#10814)
    
    This patch adds the transaction tool specified in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This includes all of the logic for describing transactional state and for aborting transactions. The only thing that is left out is the `--find-hanging` implementation, which will be left for a subsequent patch.
    
    Reviewers: Boyang Chen <bo...@apache.org>, David Jacot <dj...@confluent.io>
---
 bin/kafka-transactions.sh                          |  17 +
 bin/windows/kafka-transactions.bat                 |  17 +
 .../kafka/tools/PrintVersionAndExitAction.java     |  52 ++
 .../apache/kafka/tools/TransactionsCommand.java    | 623 +++++++++++++++++++++
 .../kafka/tools/TransactionsCommandTest.java       | 492 ++++++++++++++++
 5 files changed, 1201 insertions(+)

diff --git a/bin/kafka-transactions.sh b/bin/kafka-transactions.sh
new file mode 100755
index 0000000..6fb5233
--- /dev/null
+++ b/bin/kafka-transactions.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TransactionsCommand "$@"
diff --git a/bin/windows/kafka-transactions.bat b/bin/windows/kafka-transactions.bat
new file mode 100644
index 0000000..9bb7585
--- /dev/null
+++ b/bin/windows/kafka-transactions.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+rem     http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.TransactionsCommand %*
diff --git a/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java b/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java
new file mode 100644
index 0000000..6846c7f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.inf.Argument;
+import net.sourceforge.argparse4j.inf.ArgumentAction;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.util.Map;
+
+class PrintVersionAndExitAction implements ArgumentAction {
+
+    @Override
+    public void run(
+        ArgumentParser parser,
+        Argument arg,
+        Map<String, Object> attrs,
+        String flag,
+        Object value
+    ) {
+        String version = AppInfoParser.getVersion();
+        String commitId = AppInfoParser.getCommitId();
+        System.out.println(version + " (Commit:" + commitId + ")");
+        Exit.exit(0);
+    }
+
+    @Override
+    public void onAttach(Argument arg) {
+
+    }
+
+    @Override
+    public boolean consumeArgument() {
+        return false;
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
new file mode 100644
index 0000000..2419460
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public abstract class TransactionsCommand {
+    private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
+
+    protected final Time time;
+
+    protected TransactionsCommand(Time time) {
+        this.time = time;
+    }
+
+    /**
+     * Get the name of this command (e.g. `describe-producers`).
+     */
+    abstract String name();
+
+    /**
+     * Specify the arguments needed for this command.
+     */
+    abstract void addSubparser(Subparsers subparsers);
+
+    /**
+     * Execute the command logic.
+     */
+    abstract void execute(Admin admin, Namespace ns, PrintStream out) throws Exception;
+
+
+    static class AbortTransactionCommand extends TransactionsCommand {
+
+        AbortTransactionCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        String name() {
+            return "abort";
+        }
+
+        @Override
+        void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("abort a hanging transaction (requires administrative privileges)");
+
+            subparser.addArgument("--topic")
+                .help("topic name")
+                .action(store())
+                .type(String.class)
+                .required(true);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(true);
+
+            ArgumentGroup newBrokerArgumentGroup = subparser
+                .addArgumentGroup("Brokers on versions 3.0 and above")
+                .description("For newer brokers, only the start offset of the transaction " +
+                    "to be aborted is required");
+
+            newBrokerArgumentGroup.addArgument("--start-offset")
+                .help("start offset of the transaction to abort")
+                .action(store())
+                .type(Long.class);
+
+            ArgumentGroup olderBrokerArgumentGroup = subparser
+                .addArgumentGroup("Brokers on versions older than 3.0")
+                .description("For older brokers, you must provide all of these arguments");
+
+            olderBrokerArgumentGroup.addArgument("--producer-id")
+                .help("producer id")
+                .action(store())
+                .type(Long.class);
+
+            olderBrokerArgumentGroup.addArgument("--producer-epoch")
+                .help("producer epoch")
+                .action(store())
+                .type(Short.class);
+
+            olderBrokerArgumentGroup.addArgument("--coordinator-epoch")
+                .help("coordinator epoch")
+                .action(store())
+                .type(Integer.class);
+        }
+
+        private AbortTransactionSpec buildAbortSpec(
+            Admin admin,
+            TopicPartition topicPartition,
+            long startOffset
+        ) throws Exception {
+            final DescribeProducersResult.PartitionProducerState result;
+            try {
+                result = admin.describeProducers(singleton(topicPartition))
+                    .partitionResult(topicPartition)
+                    .get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to validate producer state for partition "
+                    + topicPartition, e.getCause());
+                return null;
+            }
+
+            Optional<ProducerState> foundProducerState = result.activeProducers().stream()
+                .filter(producerState -> {
+                    OptionalLong txnStartOffsetOpt = producerState.currentTransactionStartOffset();
+                    return txnStartOffsetOpt.isPresent() && txnStartOffsetOpt.getAsLong() == startOffset;
+                })
+                .findFirst();
+
+            if (!foundProducerState.isPresent()) {
+                printErrorAndExit("Could not find any open transactions starting at offset " +
+                    startOffset + " on partition " + topicPartition);
+                return null;
+            }
+
+            ProducerState producerState = foundProducerState.get();
+            return new AbortTransactionSpec(
+                topicPartition,
+                producerState.producerId(),
+                (short) producerState.producerEpoch(),
+                producerState.coordinatorEpoch().orElse(0)
+            );
+        }
+
+        private void abortTransaction(
+            Admin admin,
+            AbortTransactionSpec abortSpec
+        ) throws Exception {
+            try {
+                admin.abortTransaction(abortSpec).all().get();
+            } catch (ExecutionException e) {
+                TransactionsCommand.printErrorAndExit("Failed to abort transaction " + abortSpec, e.getCause());
+            }
+        }
+
+        @Override
+        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            String topicName = ns.getString("topic");
+            Integer partitionId = ns.getInt("partition");
+            TopicPartition topicPartition = new TopicPartition(topicName, partitionId);
+
+            Long startOffset = ns.getLong("start_offset");
+            Long producerId = ns.getLong("producer_id");
+
+            if (startOffset == null && producerId == null) {
+                printErrorAndExit("The transaction to abort must be identified either with " +
+                    "--start-offset (for brokers on 3.0 or above) or with " +
+                    "--producer-id, --producer-epoch, and --coordinator-epoch (for older brokers)");
+                return;
+            }
+
+            final AbortTransactionSpec abortSpec;
+            if (startOffset == null) {
+                Short producerEpoch = ns.getShort("producer_epoch");
+                if (producerEpoch == null) {
+                    printErrorAndExit("Missing required argument --producer-epoch");
+                    return;
+                }
+
+                Integer coordinatorEpoch = ns.getInt("coordinator_epoch");
+                if (coordinatorEpoch == null) {
+                    printErrorAndExit("Missing required argument --coordinator-epoch");
+                    return;
+                }
+
+                // If a transaction was started by a new producerId and became hanging
+                // before the initial commit/abort, then the coordinator epoch will be -1
+                // as seen in the `DescribeProducers` output. In this case, we conservatively
+                // use a coordinator epoch of 0, which is less than or equal to any possible
+                // leader epoch.
+                if (coordinatorEpoch < 0) {
+                    coordinatorEpoch = 0;
+                }
+
+                abortSpec = new AbortTransactionSpec(
+                    topicPartition,
+                    producerId,
+                    producerEpoch,
+                    coordinatorEpoch
+                );
+            } else {
+                abortSpec = buildAbortSpec(admin, topicPartition, startOffset);
+            }
+
+            abortTransaction(admin, abortSpec);
+        }
+    }
+
+    static class DescribeProducersCommand extends TransactionsCommand {
+        static final String[] HEADERS = new String[]{
+            "ProducerId",
+            "ProducerEpoch",
+            "LatestCoordinatorEpoch",
+            "LastSequence",
+            "LastTimestamp",
+            "CurrentTransactionStartOffset"
+        };
+
+        DescribeProducersCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        public String name() {
+            return "describe-producers";
+        }
+
+        @Override
+        public void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .help("describe the states of active producers for a topic partition");
+
+            subparser.addArgument("--broker-id")
+                .help("optional broker id to describe the producer state on a specific replica")
+                .action(store())
+                .type(Integer.class)
+                .required(false);
+
+            subparser.addArgument("--topic")
+                .help("topic name")
+                .action(store())
+                .type(String.class)
+                .required(true);
+
+            subparser.addArgument("--partition")
+                .help("partition number")
+                .action(store())
+                .type(Integer.class)
+                .required(true);
+        }
+
+        @Override
+        public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            DescribeProducersOptions options = new DescribeProducersOptions();
+            Optional.ofNullable(ns.getInt("broker_id")).ifPresent(options::brokerId);
+
+            String topicName = ns.getString("topic");
+            Integer partitionId = ns.getInt("partition");
+            TopicPartition topicPartition = new TopicPartition(topicName, partitionId);
+
+            final DescribeProducersResult.PartitionProducerState result;
+
+            try {
+                result = admin.describeProducers(singleton(topicPartition), options)
+                    .partitionResult(topicPartition)
+                    .get();
+            } catch (ExecutionException e) {
+                String brokerClause = options.brokerId().isPresent() ?
+                    "broker " + options.brokerId().getAsInt() :
+                    "leader";
+                printErrorAndExit("Failed to describe producers for partition " +
+                        topicPartition + " on " + brokerClause, e.getCause());
+                return;
+            }
+
+            List<String[]> rows = result.activeProducers().stream().map(producerState -> {
+                String currentTransactionStartOffsetColumnValue =
+                    producerState.currentTransactionStartOffset().isPresent() ?
+                        String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) :
+                        "None";
+
+                return new String[] {
+                    String.valueOf(producerState.producerId()),
+                    String.valueOf(producerState.producerEpoch()),
+                    String.valueOf(producerState.coordinatorEpoch().orElse(-1)),
+                    String.valueOf(producerState.lastSequence()),
+                    String.valueOf(producerState.lastTimestamp()),
+                    currentTransactionStartOffsetColumnValue
+                };
+            }).collect(Collectors.toList());
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+    }
+
+    static class DescribeTransactionsCommand extends TransactionsCommand {
+        static final String[] HEADERS = new String[]{
+            "CoordinatorId",
+            "TransactionalId",
+            "ProducerId",
+            "ProducerEpoch",
+            "TransactionState",
+            "TransactionTimeoutMs",
+            "CurrentTransactionStartTimeMs",
+            "TransactionDurationMs",
+            "TopicPartitions"
+        };
+
+        DescribeTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        public String name() {
+            return "describe";
+        }
+
+        @Override
+        public void addSubparser(Subparsers subparsers) {
+            Subparser subparser = subparsers.addParser(name())
+                .description("Describe the state of an active transactional-id.")
+                .help("describe the state of an active transactional-id");
+
+            subparser.addArgument("--transactional-id")
+                .help("transactional id")
+                .action(store())
+                .type(String.class)
+                .required(true);
+        }
+
+        @Override
+        public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            String transactionalId = ns.getString("transactional_id");
+
+            final TransactionDescription result;
+            try {
+                result = admin.describeTransactions(singleton(transactionalId))
+                    .description(transactionalId)
+                    .get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to describe transaction state of " +
+                    "transactional-id `" + transactionalId + "`", e.getCause());
+                return;
+            }
+
+            final String transactionDurationMsColumnValue;
+            final String transactionStartTimeMsColumnValue;
+
+            if (result.transactionStartTimeMs().isPresent()) {
+                long transactionStartTimeMs = result.transactionStartTimeMs().getAsLong();
+                transactionStartTimeMsColumnValue = String.valueOf(transactionStartTimeMs);
+                transactionDurationMsColumnValue = String.valueOf(time.milliseconds() - transactionStartTimeMs);
+            } else {
+                transactionStartTimeMsColumnValue = "None";
+                transactionDurationMsColumnValue = "None";
+            }
+
+            String[] row = new String[]{
+                String.valueOf(result.coordinatorId()),
+                transactionalId,
+                String.valueOf(result.producerId()),
+                String.valueOf(result.producerEpoch()),
+                result.state().toString(),
+                String.valueOf(result.transactionTimeoutMs()),
+                transactionStartTimeMsColumnValue,
+                transactionDurationMsColumnValue,
+                Utils.join(result.topicPartitions(), ",")
+            };
+
+            prettyPrintTable(HEADERS, singletonList(row), out);
+        }
+    }
+
+    static class ListTransactionsCommand extends TransactionsCommand {
+        static final String[] HEADERS = new String[] {
+            "TransactionalId",
+            "Coordinator",
+            "ProducerId",
+            "TransactionState"
+        };
+
+        ListTransactionsCommand(Time time) {
+            super(time);
+        }
+
+        @Override
+        public String name() {
+            return "list";
+        }
+
+        @Override
+        public void addSubparser(Subparsers subparsers) {
+            subparsers.addParser(name())
+                .help("list transactions");
+        }
+
+        @Override
+        public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+            final Map<Integer, Collection<TransactionListing>> result;
+
+            try {
+                result = admin.listTransactions()
+                    .allByBrokerId()
+                    .get();
+            } catch (ExecutionException e) {
+                printErrorAndExit("Failed to list transactions", e.getCause());
+                return;
+            }
+
+            List<String[]> rows = new ArrayList<>();
+            for (Map.Entry<Integer, Collection<TransactionListing>> brokerListingsEntry : result.entrySet()) {
+                String coordinatorIdString = brokerListingsEntry.getKey().toString();
+                Collection<TransactionListing> listings = brokerListingsEntry.getValue();
+
+                for (TransactionListing listing : listings) {
+                    rows.add(new String[] {
+                        listing.transactionalId(),
+                        coordinatorIdString,
+                        String.valueOf(listing.producerId()),
+                        listing.state().toString()
+                    });
+                }
+            }
+
+            prettyPrintTable(HEADERS, rows, out);
+        }
+    }
+
+    private static void appendColumnValue(
+        StringBuilder rowBuilder,
+        String value,
+        int length
+    ) {
+        int padLength = length - value.length();
+        rowBuilder.append(value);
+        for (int i = 0; i < padLength; i++)
+            rowBuilder.append(' ');
+    }
+
+    private static void printRow(
+        List<Integer> columnLengths,
+        String[] row,
+        PrintStream out
+    ) {
+        StringBuilder rowBuilder = new StringBuilder();
+        for (int i = 0; i < row.length; i++) {
+            Integer columnLength = columnLengths.get(i);
+            String columnValue = row[i];
+            appendColumnValue(rowBuilder, columnValue, columnLength);
+            rowBuilder.append('\t');
+        }
+        out.println(rowBuilder);
+    }
+
+    private static void prettyPrintTable(
+        String[] headers,
+        List<String[]> rows,
+        PrintStream out
+    ) {
+        List<Integer> columnLengths = Arrays.stream(headers)
+            .map(String::length)
+            .collect(Collectors.toList());
+
+        for (String[] row : rows) {
+            for (int i = 0; i < headers.length; i++) {
+                columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
+            }
+        }
+
+        printRow(columnLengths, headers, out);
+        rows.forEach(row -> printRow(columnLengths, row, out));
+    }
+
+    private static void printErrorAndExit(String message, Throwable t) {
+        log.debug(message, t);
+
+        String exitMessage = message + ": " + t.getMessage() + "." +
+            " Enable debug logging for additional detail.";
+
+        printErrorAndExit(exitMessage);
+    }
+
+    private static void printErrorAndExit(String message) {
+        System.err.println(message);
+        Exit.exit(1, message);
+    }
+
+    private static Admin buildAdminClient(Namespace ns) {
+        final Properties properties;
+
+        String configFile = ns.getString("command_config");
+        if (configFile == null) {
+            properties = new Properties();
+        } else {
+            try {
+                properties = Utils.loadProps(configFile);
+            } catch (IOException e) {
+                printErrorAndExit("Failed to load admin client properties", e);
+                return null;
+            }
+        }
+
+        String bootstrapServers = ns.getString("bootstrap_server");
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        return Admin.create(properties);
+    }
+
+    static ArgumentParser buildBaseParser() {
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("kafka-transactions.sh");
+
+        parser.description("This tool is used to analyze the transactional state of producers in the cluster. " +
+            "It can be used to detect and recover from hanging transactions.");
+
+        parser.addArgument("-v", "--version")
+            .action(new PrintVersionAndExitAction())
+            .help("show the version of this Kafka distribution and exit");
+
+        parser.addArgument("--command-config")
+            .help("property file containing configs to be passed to admin client")
+            .action(store())
+            .type(String.class)
+            .metavar("FILE")
+            .required(false);
+
+        parser.addArgument("--bootstrap-server")
+            .help("hostname and port for the broker to connect to, in the form `host:port`  " +
+                "(multiple comma-separated entries can be given)")
+            .action(store())
+            .type(String.class)
+            .metavar("host:port")
+            .required(true);
+
+        return parser;
+    }
+
+    static void execute(
+        String[] args,
+        Function<Namespace, Admin> adminSupplier,
+        PrintStream out,
+        Time time
+    ) throws Exception {
+        List<TransactionsCommand> commands = Arrays.asList(
+            new ListTransactionsCommand(time),
+            new DescribeTransactionsCommand(time),
+            new DescribeProducersCommand(time),
+            new AbortTransactionCommand(time)
+        );
+
+        ArgumentParser parser = buildBaseParser();
+        Subparsers subparsers = parser.addSubparsers()
+            .dest("command")
+            .title("commands")
+            .metavar("COMMAND");
+        commands.forEach(command -> command.addSubparser(subparsers));
+
+        final Namespace ns;
+
+        try {
+            ns = parser.parseArgs(args);
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            Exit.exit(1);
+            return;
+        }
+
+        Admin admin = adminSupplier.apply(ns);
+        String commandName = ns.getString("command");
+
+        Optional<TransactionsCommand> commandOpt = commands.stream()
+            .filter(cmd -> cmd.name().equals(commandName))
+            .findFirst();
+
+        if (!commandOpt.isPresent()) {
+            printErrorAndExit("Unexpected command " + commandName);
+        }
+
+        TransactionsCommand command = commandOpt.get();
+        command.execute(admin, ns, out);
+        Exit.exit(0);
+    }
+
+    public static void main(String[] args) throws Exception {
+        execute(args, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM);
+    }
+
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
new file mode 100644
index 0000000..b5d7b93
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.AbortTransactionResult;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
+import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTransactionsResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionsCommandTest {
+
+    private final MockExitProcedure exitProcedure = new MockExitProcedure();
+    private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    private final PrintStream out = new PrintStream(outputStream);
+    private final MockTime time = new MockTime();
+    private final Admin admin = Mockito.mock(Admin.class);
+
+    @BeforeEach
+    public void setupExitProcedure() {
+        Exit.setExitProcedure(exitProcedure);
+    }
+
+    @AfterEach
+    public void resetExitProcedure() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void testDescribeProducersTopicRequired() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "describe-producers",
+            "--partition",
+            "0"
+        });
+    }
+
+    @Test
+    public void testDescribeProducersPartitionRequired() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "describe-producers",
+            "--topic",
+            "foo"
+        });
+    }
+
+    @Test
+    public void testDescribeProducersLeader() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "describe-producers",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition())
+        };
+
+        testDescribeProducers(topicPartition, args, new DescribeProducersOptions());
+    }
+
+    @Test
+    public void testDescribeProducersSpecificReplica() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+        int brokerId = 5;
+
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "describe-producers",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition()),
+            "--broker-id",
+            String.valueOf(brokerId)
+        };
+
+        testDescribeProducers(topicPartition, args, new DescribeProducersOptions().brokerId(brokerId));
+    }
+
+    private void testDescribeProducers(
+        TopicPartition topicPartition,
+        String[] args,
+        DescribeProducersOptions expectedOptions
+    ) throws Exception {
+        DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class);
+        KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture(
+            new PartitionProducerState(asList(
+                new ProducerState(12345L, 15, 1300, 1599509565L,
+                    OptionalInt.of(20), OptionalLong.of(990)),
+                new ProducerState(98765L, 30, 2300, 1599509599L,
+                    OptionalInt.empty(), OptionalLong.empty())
+            )));
+
+
+        Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture);
+        Mockito.when(admin.describeProducers(singleton(topicPartition), expectedOptions)).thenReturn(describeResult);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(3, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.DescribeProducersCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+
+        Set<List<String>> expectedRows = Utils.mkSet(
+            asList("12345", "15", "20", "1300", "1599509565", "990"),
+            asList("98765", "30", "-1", "2300", "1599509599", "None")
+        );
+        assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size())));
+    }
+
+    @Test
+    public void testListTransactions() throws Exception {
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "list"
+        };
+
+        ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+
+        Map<Integer, Collection<TransactionListing>> transactions = new HashMap<>();
+        transactions.put(0, asList(
+            new TransactionListing("foo", 12345L, TransactionState.ONGOING),
+            new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT)
+        ));
+        transactions.put(1, singletonList(
+            new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT)
+        ));
+
+        KafkaFuture<Map<Integer, Collection<TransactionListing>>> listTransactionsFuture =
+            KafkaFutureImpl.completedFuture(transactions);
+
+        Mockito.when(admin.listTransactions()).thenReturn(listResult);
+        Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(4, table.size());
+
+        // Assert expected headers
+        List<String> expectedHeaders = asList(TransactionsCommand.ListTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+
+        Set<List<String>> expectedRows = Utils.mkSet(
+            asList("foo", "0", "12345", "Ongoing"),
+            asList("bar", "0", "98765", "PrepareAbort"),
+            asList("baz", "1", "13579", "CompleteCommit")
+        );
+        assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size())));
+    }
+
+    @Test
+    public void testDescribeTransactionsTransactionalIdRequired() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "describe"
+        });
+    }
+
+    @Test
+    public void testDescribeTransaction() throws Exception {
+        String transactionalId = "foo";
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "describe",
+            "--transactional-id",
+            transactionalId
+        };
+
+        DescribeTransactionsResult describeResult = Mockito.mock(DescribeTransactionsResult.class);
+
+        int coordinatorId = 5;
+        long transactionStartTime = time.milliseconds();
+
+        KafkaFuture<TransactionDescription> describeFuture = KafkaFutureImpl.completedFuture(
+            new TransactionDescription(
+                coordinatorId,
+                TransactionState.ONGOING,
+                12345L,
+                15,
+                10000,
+                OptionalLong.of(transactionStartTime),
+                singleton(new TopicPartition("bar", 0))
+        ));
+
+        Mockito.when(describeResult.description(transactionalId)).thenReturn(describeFuture);
+        Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(describeResult);
+
+        // Add a little time so that we can see a positive transaction duration in the output
+        time.sleep(5000);
+
+        execute(args);
+        assertNormalExit();
+
+        List<List<String>> table = readOutputAsTable();
+        assertEquals(2, table.size());
+
+        List<String> expectedHeaders = asList(TransactionsCommand.DescribeTransactionsCommand.HEADERS);
+        assertEquals(expectedHeaders, table.get(0));
+
+        List<String> expectedRow = asList(
+            String.valueOf(coordinatorId),
+            transactionalId,
+            "12345",
+            "15",
+            "Ongoing",
+            "10000",
+            String.valueOf(transactionStartTime),
+            "5000",
+            "bar-0"
+        );
+        assertEquals(expectedRow, table.get(1));
+    }
+
+    @Test
+    public void testDescribeTransactionsStartOffsetOrProducerIdRequired() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--topic",
+            "foo",
+            "--partition",
+            "0"
+        });
+    }
+
+    @Test
+    public void testDescribeTransactionsTopicRequired() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--partition",
+            "0",
+            "--start-offset",
+            "9990"
+        });
+    }
+
+    @Test
+    public void testDescribeTransactionsPartitionRequired() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--topic",
+            "foo",
+            "--start-offset",
+            "9990"
+        });
+    }
+
+    @Test
+    public void testDescribeTransactionsProducerEpochRequiredWithProducerId() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--topic",
+            "foo",
+            "--partition",
+            "0",
+            "--producer-id",
+            "12345"
+        });
+    }
+
+    @Test
+    public void testDescribeTransactionsCoordinatorEpochRequiredWithProducerId() throws Exception {
+        assertCommandFailure(new String[]{
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--topic",
+            "foo",
+            "--partition",
+            "0",
+            "--producer-id",
+            "12345",
+            "--producer-epoch",
+            "15"
+        });
+    }
+
+    @Test
+    public void testNewBrokerAbortTransaction() throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+        long startOffset = 9173;
+        long producerId = 12345L;
+        short producerEpoch = 15;
+        int coordinatorEpoch = 76;
+
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition()),
+            "--start-offset",
+            String.valueOf(startOffset)
+        };
+
+        DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class);
+        KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture(
+            new PartitionProducerState(singletonList(
+                new ProducerState(producerId, producerEpoch, 1300, 1599509565L,
+                    OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset))
+            )));
+
+        AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class);
+        KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+        AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
+            topicPartition, producerId, producerEpoch, coordinatorEpoch);
+
+        Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture);
+        Mockito.when(admin.describeProducers(singleton(topicPartition))).thenReturn(describeResult);
+
+        Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture);
+        Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult);
+
+        execute(args);
+        assertNormalExit();
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {29, -1})
+    public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordinatorEpoch) throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 5);
+        long producerId = 12345L;
+        short producerEpoch = 15;
+
+        String[] args = new String[] {
+            "--bootstrap-server",
+            "localhost:9092",
+            "abort",
+            "--topic",
+            topicPartition.topic(),
+            "--partition",
+            String.valueOf(topicPartition.partition()),
+            "--producer-id",
+            String.valueOf(producerId),
+            "--producer-epoch",
+            String.valueOf(producerEpoch),
+            "--coordinator-epoch",
+            String.valueOf(coordinatorEpoch)
+        };
+
+        AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class);
+        KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+
+        final int expectedCoordinatorEpoch;
+        if (coordinatorEpoch < 0) {
+            expectedCoordinatorEpoch = 0;
+        } else {
+            expectedCoordinatorEpoch = coordinatorEpoch;
+        }
+
+        AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
+            topicPartition, producerId, producerEpoch, expectedCoordinatorEpoch);
+
+        Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture);
+        Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult);
+
+        execute(args);
+        assertNormalExit();
+    }
+
+    private void execute(String[] args) throws Exception {
+        TransactionsCommand.execute(args, ns -> admin, out, time);
+    }
+
+    private List<List<String>> readOutputAsTable() throws IOException {
+        List<List<String>> table = new ArrayList<>();
+        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+
+        while (true) {
+            List<String> row = readRow(reader);
+            if (row == null) {
+                break;
+            }
+            table.add(row);
+        }
+        return table;
+    }
+
+    private List<String> readRow(BufferedReader reader) throws IOException {
+        String line = reader.readLine();
+        if (line == null) {
+            return null;
+        } else {
+            return asList(line.split("\\s+"));
+        }
+    }
+
+    private void assertNormalExit() {
+        assertTrue(exitProcedure.hasExited);
+        assertEquals(0, exitProcedure.statusCode);
+    }
+
+    private void assertCommandFailure(String[] args) throws Exception {
+        execute(args);
+        assertTrue(exitProcedure.hasExited);
+        assertEquals(1, exitProcedure.statusCode);
+    }
+
+    private static class MockExitProcedure implements Exit.Procedure {
+        private boolean hasExited = false;
+        private int statusCode;
+
+        @Override
+        public void execute(int statusCode, String message) {
+            if (!this.hasExited) {
+                this.hasExited = true;
+                this.statusCode = statusCode;
+            }
+        }
+    }
+
+}