You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/22 16:48:50 UTC
[kafka] branch trunk updated: KAFKA-12888;
Add transaction tool from KIP-664 (#10814)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fce7715 KAFKA-12888; Add transaction tool from KIP-664 (#10814)
fce7715 is described below
commit fce771579c3e20f20949c4c7e0a5e3a16c57c7f0
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Jun 22 09:47:30 2021 -0700
KAFKA-12888; Add transaction tool from KIP-664 (#10814)
This patch adds the transaction tool specified in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This includes all of the logic for describing transactional state and for aborting transactions. The only thing that is left out is the `--find-hanging` implementation, which will be left for a subsequent patch.
Reviewers: Boyang Chen <bo...@apache.org>, David Jacot <dj...@confluent.io>
---
bin/kafka-transactions.sh | 17 +
bin/windows/kafka-transactions.bat | 17 +
.../kafka/tools/PrintVersionAndExitAction.java | 52 ++
.../apache/kafka/tools/TransactionsCommand.java | 623 +++++++++++++++++++++
.../kafka/tools/TransactionsCommandTest.java | 492 ++++++++++++++++
5 files changed, 1201 insertions(+)
diff --git a/bin/kafka-transactions.sh b/bin/kafka-transactions.sh
new file mode 100755
index 0000000..6fb5233
--- /dev/null
+++ b/bin/kafka-transactions.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.TransactionsCommand "$@"
diff --git a/bin/windows/kafka-transactions.bat b/bin/windows/kafka-transactions.bat
new file mode 100644
index 0000000..9bb7585
--- /dev/null
+++ b/bin/windows/kafka-transactions.bat
@@ -0,0 +1,17 @@
+@echo off
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements. See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License. You may obtain a copy of the License at
+rem
+rem http://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.TransactionsCommand %*
diff --git a/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java b/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java
new file mode 100644
index 0000000..6846c7f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/PrintVersionAndExitAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.inf.Argument;
+import net.sourceforge.argparse4j.inf.ArgumentAction;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.util.Map;
+
+class PrintVersionAndExitAction implements ArgumentAction {
+
+ @Override
+ public void run(
+ ArgumentParser parser,
+ Argument arg,
+ Map<String, Object> attrs,
+ String flag,
+ Object value
+ ) {
+ String version = AppInfoParser.getVersion();
+ String commitId = AppInfoParser.getCommitId();
+ System.out.println(version + " (Commit:" + commitId + ")");
+ Exit.exit(0);
+ }
+
+ @Override
+ public void onAttach(Argument arg) {
+
+ }
+
+ @Override
+ public boolean consumeArgument() {
+ return false;
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
new file mode 100644
index 0000000..2419460
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public abstract class TransactionsCommand {
+ private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
+
+ protected final Time time;
+
+ protected TransactionsCommand(Time time) {
+ this.time = time;
+ }
+
+ /**
+ * Get the name of this command (e.g. `describe-producers`).
+ */
+ abstract String name();
+
+ /**
+ * Specify the arguments needed for this command.
+ */
+ abstract void addSubparser(Subparsers subparsers);
+
+ /**
+ * Execute the command logic.
+ */
+ abstract void execute(Admin admin, Namespace ns, PrintStream out) throws Exception;
+
+
+ static class AbortTransactionCommand extends TransactionsCommand {
+
+ AbortTransactionCommand(Time time) {
+ super(time);
+ }
+
+ @Override
+ String name() {
+ return "abort";
+ }
+
+ @Override
+ void addSubparser(Subparsers subparsers) {
+ Subparser subparser = subparsers.addParser(name())
+ .help("abort a hanging transaction (requires administrative privileges)");
+
+ subparser.addArgument("--topic")
+ .help("topic name")
+ .action(store())
+ .type(String.class)
+ .required(true);
+
+ subparser.addArgument("--partition")
+ .help("partition number")
+ .action(store())
+ .type(Integer.class)
+ .required(true);
+
+ ArgumentGroup newBrokerArgumentGroup = subparser
+ .addArgumentGroup("Brokers on versions 3.0 and above")
+ .description("For newer brokers, only the start offset of the transaction " +
+ "to be aborted is required");
+
+ newBrokerArgumentGroup.addArgument("--start-offset")
+ .help("start offset of the transaction to abort")
+ .action(store())
+ .type(Long.class);
+
+ ArgumentGroup olderBrokerArgumentGroup = subparser
+ .addArgumentGroup("Brokers on versions older than 3.0")
+ .description("For older brokers, you must provide all of these arguments");
+
+ olderBrokerArgumentGroup.addArgument("--producer-id")
+ .help("producer id")
+ .action(store())
+ .type(Long.class);
+
+ olderBrokerArgumentGroup.addArgument("--producer-epoch")
+ .help("producer epoch")
+ .action(store())
+ .type(Short.class);
+
+ olderBrokerArgumentGroup.addArgument("--coordinator-epoch")
+ .help("coordinator epoch")
+ .action(store())
+ .type(Integer.class);
+ }
+
+ private AbortTransactionSpec buildAbortSpec(
+ Admin admin,
+ TopicPartition topicPartition,
+ long startOffset
+ ) throws Exception {
+ final DescribeProducersResult.PartitionProducerState result;
+ try {
+ result = admin.describeProducers(singleton(topicPartition))
+ .partitionResult(topicPartition)
+ .get();
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to validate producer state for partition "
+ + topicPartition, e.getCause());
+ return null;
+ }
+
+ Optional<ProducerState> foundProducerState = result.activeProducers().stream()
+ .filter(producerState -> {
+ OptionalLong txnStartOffsetOpt = producerState.currentTransactionStartOffset();
+ return txnStartOffsetOpt.isPresent() && txnStartOffsetOpt.getAsLong() == startOffset;
+ })
+ .findFirst();
+
+ if (!foundProducerState.isPresent()) {
+ printErrorAndExit("Could not find any open transactions starting at offset " +
+ startOffset + " on partition " + topicPartition);
+ return null;
+ }
+
+ ProducerState producerState = foundProducerState.get();
+ return new AbortTransactionSpec(
+ topicPartition,
+ producerState.producerId(),
+ (short) producerState.producerEpoch(),
+ producerState.coordinatorEpoch().orElse(0)
+ );
+ }
+
+ private void abortTransaction(
+ Admin admin,
+ AbortTransactionSpec abortSpec
+ ) throws Exception {
+ try {
+ admin.abortTransaction(abortSpec).all().get();
+ } catch (ExecutionException e) {
+ TransactionsCommand.printErrorAndExit("Failed to abort transaction " + abortSpec, e.getCause());
+ }
+ }
+
+ @Override
+ void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+ String topicName = ns.getString("topic");
+ Integer partitionId = ns.getInt("partition");
+ TopicPartition topicPartition = new TopicPartition(topicName, partitionId);
+
+ Long startOffset = ns.getLong("start_offset");
+ Long producerId = ns.getLong("producer_id");
+
+ if (startOffset == null && producerId == null) {
+ printErrorAndExit("The transaction to abort must be identified either with " +
+ "--start-offset (for brokers on 3.0 or above) or with " +
+ "--producer-id, --producer-epoch, and --coordinator-epoch (for older brokers)");
+ return;
+ }
+
+ final AbortTransactionSpec abortSpec;
+ if (startOffset == null) {
+ Short producerEpoch = ns.getShort("producer_epoch");
+ if (producerEpoch == null) {
+ printErrorAndExit("Missing required argument --producer-epoch");
+ return;
+ }
+
+ Integer coordinatorEpoch = ns.getInt("coordinator_epoch");
+ if (coordinatorEpoch == null) {
+ printErrorAndExit("Missing required argument --coordinator-epoch");
+ return;
+ }
+
+ // If a transaction was started by a new producerId and became hanging
+ // before the initial commit/abort, then the coordinator epoch will be -1
+ // as seen in the `DescribeProducers` output. In this case, we conservatively
+ // use a coordinator epoch of 0, which is less than or equal to any possible
+ // leader epoch.
+ if (coordinatorEpoch < 0) {
+ coordinatorEpoch = 0;
+ }
+
+ abortSpec = new AbortTransactionSpec(
+ topicPartition,
+ producerId,
+ producerEpoch,
+ coordinatorEpoch
+ );
+ } else {
+ abortSpec = buildAbortSpec(admin, topicPartition, startOffset);
+ }
+
+ abortTransaction(admin, abortSpec);
+ }
+ }
+
+ static class DescribeProducersCommand extends TransactionsCommand {
+ static final String[] HEADERS = new String[]{
+ "ProducerId",
+ "ProducerEpoch",
+ "LatestCoordinatorEpoch",
+ "LastSequence",
+ "LastTimestamp",
+ "CurrentTransactionStartOffset"
+ };
+
+ DescribeProducersCommand(Time time) {
+ super(time);
+ }
+
+ @Override
+ public String name() {
+ return "describe-producers";
+ }
+
+ @Override
+ public void addSubparser(Subparsers subparsers) {
+ Subparser subparser = subparsers.addParser(name())
+ .help("describe the states of active producers for a topic partition");
+
+ subparser.addArgument("--broker-id")
+ .help("optional broker id to describe the producer state on a specific replica")
+ .action(store())
+ .type(Integer.class)
+ .required(false);
+
+ subparser.addArgument("--topic")
+ .help("topic name")
+ .action(store())
+ .type(String.class)
+ .required(true);
+
+ subparser.addArgument("--partition")
+ .help("partition number")
+ .action(store())
+ .type(Integer.class)
+ .required(true);
+ }
+
+ @Override
+ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+ DescribeProducersOptions options = new DescribeProducersOptions();
+ Optional.ofNullable(ns.getInt("broker_id")).ifPresent(options::brokerId);
+
+ String topicName = ns.getString("topic");
+ Integer partitionId = ns.getInt("partition");
+ TopicPartition topicPartition = new TopicPartition(topicName, partitionId);
+
+ final DescribeProducersResult.PartitionProducerState result;
+
+ try {
+ result = admin.describeProducers(singleton(topicPartition), options)
+ .partitionResult(topicPartition)
+ .get();
+ } catch (ExecutionException e) {
+ String brokerClause = options.brokerId().isPresent() ?
+ "broker " + options.brokerId().getAsInt() :
+ "leader";
+ printErrorAndExit("Failed to describe producers for partition " +
+ topicPartition + " on " + brokerClause, e.getCause());
+ return;
+ }
+
+ List<String[]> rows = result.activeProducers().stream().map(producerState -> {
+ String currentTransactionStartOffsetColumnValue =
+ producerState.currentTransactionStartOffset().isPresent() ?
+ String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) :
+ "None";
+
+ return new String[] {
+ String.valueOf(producerState.producerId()),
+ String.valueOf(producerState.producerEpoch()),
+ String.valueOf(producerState.coordinatorEpoch().orElse(-1)),
+ String.valueOf(producerState.lastSequence()),
+ String.valueOf(producerState.lastTimestamp()),
+ currentTransactionStartOffsetColumnValue
+ };
+ }).collect(Collectors.toList());
+
+ prettyPrintTable(HEADERS, rows, out);
+ }
+ }
+
+ static class DescribeTransactionsCommand extends TransactionsCommand {
+ static final String[] HEADERS = new String[]{
+ "CoordinatorId",
+ "TransactionalId",
+ "ProducerId",
+ "ProducerEpoch",
+ "TransactionState",
+ "TransactionTimeoutMs",
+ "CurrentTransactionStartTimeMs",
+ "TransactionDurationMs",
+ "TopicPartitions"
+ };
+
+ DescribeTransactionsCommand(Time time) {
+ super(time);
+ }
+
+ @Override
+ public String name() {
+ return "describe";
+ }
+
+ @Override
+ public void addSubparser(Subparsers subparsers) {
+ Subparser subparser = subparsers.addParser(name())
+ .description("Describe the state of an active transactional-id.")
+ .help("describe the state of an active transactional-id");
+
+ subparser.addArgument("--transactional-id")
+ .help("transactional id")
+ .action(store())
+ .type(String.class)
+ .required(true);
+ }
+
+ @Override
+ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+ String transactionalId = ns.getString("transactional_id");
+
+ final TransactionDescription result;
+ try {
+ result = admin.describeTransactions(singleton(transactionalId))
+ .description(transactionalId)
+ .get();
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to describe transaction state of " +
+ "transactional-id `" + transactionalId + "`", e.getCause());
+ return;
+ }
+
+ final String transactionDurationMsColumnValue;
+ final String transactionStartTimeMsColumnValue;
+
+ if (result.transactionStartTimeMs().isPresent()) {
+ long transactionStartTimeMs = result.transactionStartTimeMs().getAsLong();
+ transactionStartTimeMsColumnValue = String.valueOf(transactionStartTimeMs);
+ transactionDurationMsColumnValue = String.valueOf(time.milliseconds() - transactionStartTimeMs);
+ } else {
+ transactionStartTimeMsColumnValue = "None";
+ transactionDurationMsColumnValue = "None";
+ }
+
+ String[] row = new String[]{
+ String.valueOf(result.coordinatorId()),
+ transactionalId,
+ String.valueOf(result.producerId()),
+ String.valueOf(result.producerEpoch()),
+ result.state().toString(),
+ String.valueOf(result.transactionTimeoutMs()),
+ transactionStartTimeMsColumnValue,
+ transactionDurationMsColumnValue,
+ Utils.join(result.topicPartitions(), ",")
+ };
+
+ prettyPrintTable(HEADERS, singletonList(row), out);
+ }
+ }
+
+ static class ListTransactionsCommand extends TransactionsCommand {
+ static final String[] HEADERS = new String[] {
+ "TransactionalId",
+ "Coordinator",
+ "ProducerId",
+ "TransactionState"
+ };
+
+ ListTransactionsCommand(Time time) {
+ super(time);
+ }
+
+ @Override
+ public String name() {
+ return "list";
+ }
+
+ @Override
+ public void addSubparser(Subparsers subparsers) {
+ subparsers.addParser(name())
+ .help("list transactions");
+ }
+
+ @Override
+ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
+ final Map<Integer, Collection<TransactionListing>> result;
+
+ try {
+ result = admin.listTransactions()
+ .allByBrokerId()
+ .get();
+ } catch (ExecutionException e) {
+ printErrorAndExit("Failed to list transactions", e.getCause());
+ return;
+ }
+
+ List<String[]> rows = new ArrayList<>();
+ for (Map.Entry<Integer, Collection<TransactionListing>> brokerListingsEntry : result.entrySet()) {
+ String coordinatorIdString = brokerListingsEntry.getKey().toString();
+ Collection<TransactionListing> listings = brokerListingsEntry.getValue();
+
+ for (TransactionListing listing : listings) {
+ rows.add(new String[] {
+ listing.transactionalId(),
+ coordinatorIdString,
+ String.valueOf(listing.producerId()),
+ listing.state().toString()
+ });
+ }
+ }
+
+ prettyPrintTable(HEADERS, rows, out);
+ }
+ }
+
+ private static void appendColumnValue(
+ StringBuilder rowBuilder,
+ String value,
+ int length
+ ) {
+ int padLength = length - value.length();
+ rowBuilder.append(value);
+ for (int i = 0; i < padLength; i++)
+ rowBuilder.append(' ');
+ }
+
+ private static void printRow(
+ List<Integer> columnLengths,
+ String[] row,
+ PrintStream out
+ ) {
+ StringBuilder rowBuilder = new StringBuilder();
+ for (int i = 0; i < row.length; i++) {
+ Integer columnLength = columnLengths.get(i);
+ String columnValue = row[i];
+ appendColumnValue(rowBuilder, columnValue, columnLength);
+ rowBuilder.append('\t');
+ }
+ out.println(rowBuilder);
+ }
+
+ private static void prettyPrintTable(
+ String[] headers,
+ List<String[]> rows,
+ PrintStream out
+ ) {
+ List<Integer> columnLengths = Arrays.stream(headers)
+ .map(String::length)
+ .collect(Collectors.toList());
+
+ for (String[] row : rows) {
+ for (int i = 0; i < headers.length; i++) {
+ columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length()));
+ }
+ }
+
+ printRow(columnLengths, headers, out);
+ rows.forEach(row -> printRow(columnLengths, row, out));
+ }
+
+ private static void printErrorAndExit(String message, Throwable t) {
+ log.debug(message, t);
+
+ String exitMessage = message + ": " + t.getMessage() + "." +
+ " Enable debug logging for additional detail.";
+
+ printErrorAndExit(exitMessage);
+ }
+
+ private static void printErrorAndExit(String message) {
+ System.err.println(message);
+ Exit.exit(1, message);
+ }
+
+ private static Admin buildAdminClient(Namespace ns) {
+ final Properties properties;
+
+ String configFile = ns.getString("command_config");
+ if (configFile == null) {
+ properties = new Properties();
+ } else {
+ try {
+ properties = Utils.loadProps(configFile);
+ } catch (IOException e) {
+ printErrorAndExit("Failed to load admin client properties", e);
+ return null;
+ }
+ }
+
+ String bootstrapServers = ns.getString("bootstrap_server");
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+ return Admin.create(properties);
+ }
+
+ static ArgumentParser buildBaseParser() {
+ ArgumentParser parser = ArgumentParsers
+ .newArgumentParser("kafka-transactions.sh");
+
+ parser.description("This tool is used to analyze the transactional state of producers in the cluster. " +
+ "It can be used to detect and recover from hanging transactions.");
+
+ parser.addArgument("-v", "--version")
+ .action(new PrintVersionAndExitAction())
+ .help("show the version of this Kafka distribution and exit");
+
+ parser.addArgument("--command-config")
+ .help("property file containing configs to be passed to admin client")
+ .action(store())
+ .type(String.class)
+ .metavar("FILE")
+ .required(false);
+
+ parser.addArgument("--bootstrap-server")
+ .help("hostname and port for the broker to connect to, in the form `host:port` " +
+ "(multiple comma-separated entries can be given)")
+ .action(store())
+ .type(String.class)
+ .metavar("host:port")
+ .required(true);
+
+ return parser;
+ }
+
+ static void execute(
+ String[] args,
+ Function<Namespace, Admin> adminSupplier,
+ PrintStream out,
+ Time time
+ ) throws Exception {
+ List<TransactionsCommand> commands = Arrays.asList(
+ new ListTransactionsCommand(time),
+ new DescribeTransactionsCommand(time),
+ new DescribeProducersCommand(time),
+ new AbortTransactionCommand(time)
+ );
+
+ ArgumentParser parser = buildBaseParser();
+ Subparsers subparsers = parser.addSubparsers()
+ .dest("command")
+ .title("commands")
+ .metavar("COMMAND");
+ commands.forEach(command -> command.addSubparser(subparsers));
+
+ final Namespace ns;
+
+ try {
+ ns = parser.parseArgs(args);
+ } catch (ArgumentParserException e) {
+ parser.handleError(e);
+ Exit.exit(1);
+ return;
+ }
+
+ Admin admin = adminSupplier.apply(ns);
+ String commandName = ns.getString("command");
+
+ Optional<TransactionsCommand> commandOpt = commands.stream()
+ .filter(cmd -> cmd.name().equals(commandName))
+ .findFirst();
+
+ if (!commandOpt.isPresent()) {
+ printErrorAndExit("Unexpected command " + commandName);
+ }
+
+ TransactionsCommand command = commandOpt.get();
+ command.execute(admin, ns, out);
+ Exit.exit(0);
+ }
+
+ public static void main(String[] args) throws Exception {
+ execute(args, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM);
+ }
+
+}
diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
new file mode 100644
index 0000000..b5d7b93
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.AbortTransactionResult;
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeProducersOptions;
+import org.apache.kafka.clients.admin.DescribeProducersResult;
+import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
+import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTransactionsResult;
+import org.apache.kafka.clients.admin.ProducerState;
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mockito;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionsCommandTest {
+
+ private final MockExitProcedure exitProcedure = new MockExitProcedure();
+ private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ private final PrintStream out = new PrintStream(outputStream);
+ private final MockTime time = new MockTime();
+ private final Admin admin = Mockito.mock(Admin.class);
+
+ @BeforeEach
+ public void setupExitProcedure() {
+ Exit.setExitProcedure(exitProcedure);
+ }
+
+ @AfterEach
+ public void resetExitProcedure() {
+ Exit.resetExitProcedure();
+ }
+
+ @Test
+ public void testDescribeProducersTopicRequired() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "describe-producers",
+ "--partition",
+ "0"
+ });
+ }
+
+ @Test
+ public void testDescribeProducersPartitionRequired() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "describe-producers",
+ "--topic",
+ "foo"
+ });
+ }
+
+ @Test
+ public void testDescribeProducersLeader() throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "describe-producers",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition())
+ };
+
+ testDescribeProducers(topicPartition, args, new DescribeProducersOptions());
+ }
+
+ @Test
+ public void testDescribeProducersSpecificReplica() throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+ int brokerId = 5;
+
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "describe-producers",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition()),
+ "--broker-id",
+ String.valueOf(brokerId)
+ };
+
+ testDescribeProducers(topicPartition, args, new DescribeProducersOptions().brokerId(brokerId));
+ }
+
+ private void testDescribeProducers(
+ TopicPartition topicPartition,
+ String[] args,
+ DescribeProducersOptions expectedOptions
+ ) throws Exception {
+ DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class);
+ KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture(
+ new PartitionProducerState(asList(
+ new ProducerState(12345L, 15, 1300, 1599509565L,
+ OptionalInt.of(20), OptionalLong.of(990)),
+ new ProducerState(98765L, 30, 2300, 1599509599L,
+ OptionalInt.empty(), OptionalLong.empty())
+ )));
+
+
+ Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture);
+ Mockito.when(admin.describeProducers(singleton(topicPartition), expectedOptions)).thenReturn(describeResult);
+
+ execute(args);
+ assertNormalExit();
+
+ List<List<String>> table = readOutputAsTable();
+ assertEquals(3, table.size());
+
+ List<String> expectedHeaders = asList(TransactionsCommand.DescribeProducersCommand.HEADERS);
+ assertEquals(expectedHeaders, table.get(0));
+
+ Set<List<String>> expectedRows = Utils.mkSet(
+ asList("12345", "15", "20", "1300", "1599509565", "990"),
+ asList("98765", "30", "-1", "2300", "1599509599", "None")
+ );
+ assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size())));
+ }
+
+ @Test
+ public void testListTransactions() throws Exception {
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "list"
+ };
+
+ ListTransactionsResult listResult = Mockito.mock(ListTransactionsResult.class);
+
+ Map<Integer, Collection<TransactionListing>> transactions = new HashMap<>();
+ transactions.put(0, asList(
+ new TransactionListing("foo", 12345L, TransactionState.ONGOING),
+ new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT)
+ ));
+ transactions.put(1, singletonList(
+ new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT)
+ ));
+
+ KafkaFuture<Map<Integer, Collection<TransactionListing>>> listTransactionsFuture =
+ KafkaFutureImpl.completedFuture(transactions);
+
+ Mockito.when(admin.listTransactions()).thenReturn(listResult);
+ Mockito.when(listResult.allByBrokerId()).thenReturn(listTransactionsFuture);
+
+ execute(args);
+ assertNormalExit();
+
+ List<List<String>> table = readOutputAsTable();
+ assertEquals(4, table.size());
+
+ // Assert expected headers
+ List<String> expectedHeaders = asList(TransactionsCommand.ListTransactionsCommand.HEADERS);
+ assertEquals(expectedHeaders, table.get(0));
+
+ Set<List<String>> expectedRows = Utils.mkSet(
+ asList("foo", "0", "12345", "Ongoing"),
+ asList("bar", "0", "98765", "PrepareAbort"),
+ asList("baz", "1", "13579", "CompleteCommit")
+ );
+ assertEquals(expectedRows, new HashSet<>(table.subList(1, table.size())));
+ }
+
+ @Test
+ public void testDescribeTransactionsTransactionalIdRequired() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "describe"
+ });
+ }
+
+ @Test
+ public void testDescribeTransaction() throws Exception {
+ String transactionalId = "foo";
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "describe",
+ "--transactional-id",
+ transactionalId
+ };
+
+ DescribeTransactionsResult describeResult = Mockito.mock(DescribeTransactionsResult.class);
+
+ int coordinatorId = 5;
+ long transactionStartTime = time.milliseconds();
+
+ KafkaFuture<TransactionDescription> describeFuture = KafkaFutureImpl.completedFuture(
+ new TransactionDescription(
+ coordinatorId,
+ TransactionState.ONGOING,
+ 12345L,
+ 15,
+ 10000,
+ OptionalLong.of(transactionStartTime),
+ singleton(new TopicPartition("bar", 0))
+ ));
+
+ Mockito.when(describeResult.description(transactionalId)).thenReturn(describeFuture);
+ Mockito.when(admin.describeTransactions(singleton(transactionalId))).thenReturn(describeResult);
+
+ // Add a little time so that we can see a positive transaction duration in the output
+ time.sleep(5000);
+
+ execute(args);
+ assertNormalExit();
+
+ List<List<String>> table = readOutputAsTable();
+ assertEquals(2, table.size());
+
+ List<String> expectedHeaders = asList(TransactionsCommand.DescribeTransactionsCommand.HEADERS);
+ assertEquals(expectedHeaders, table.get(0));
+
+ List<String> expectedRow = asList(
+ String.valueOf(coordinatorId),
+ transactionalId,
+ "12345",
+ "15",
+ "Ongoing",
+ "10000",
+ String.valueOf(transactionStartTime),
+ "5000",
+ "bar-0"
+ );
+ assertEquals(expectedRow, table.get(1));
+ }
+
+ @Test
+ public void testDescribeTransactionsStartOffsetOrProducerIdRequired() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--topic",
+ "foo",
+ "--partition",
+ "0"
+ });
+ }
+
+ @Test
+ public void testDescribeTransactionsTopicRequired() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--partition",
+ "0",
+ "--start-offset",
+ "9990"
+ });
+ }
+
+ @Test
+ public void testDescribeTransactionsPartitionRequired() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--topic",
+ "foo",
+ "--start-offset",
+ "9990"
+ });
+ }
+
+ @Test
+ public void testDescribeTransactionsProducerEpochRequiredWithProducerId() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--topic",
+ "foo",
+ "--partition",
+ "0",
+ "--producer-id",
+ "12345"
+ });
+ }
+
+ @Test
+ public void testDescribeTransactionsCoordinatorEpochRequiredWithProducerId() throws Exception {
+ assertCommandFailure(new String[]{
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--topic",
+ "foo",
+ "--partition",
+ "0",
+ "--producer-id",
+ "12345",
+ "--producer-epoch",
+ "15"
+ });
+ }
+
+ @Test
+ public void testNewBrokerAbortTransaction() throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+ long startOffset = 9173;
+ long producerId = 12345L;
+ short producerEpoch = 15;
+ int coordinatorEpoch = 76;
+
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition()),
+ "--start-offset",
+ String.valueOf(startOffset)
+ };
+
+ DescribeProducersResult describeResult = Mockito.mock(DescribeProducersResult.class);
+ KafkaFuture<PartitionProducerState> describeFuture = KafkaFutureImpl.completedFuture(
+ new PartitionProducerState(singletonList(
+ new ProducerState(producerId, producerEpoch, 1300, 1599509565L,
+ OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset))
+ )));
+
+ AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class);
+ KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+ AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
+ topicPartition, producerId, producerEpoch, coordinatorEpoch);
+
+ Mockito.when(describeResult.partitionResult(topicPartition)).thenReturn(describeFuture);
+ Mockito.when(admin.describeProducers(singleton(topicPartition))).thenReturn(describeResult);
+
+ Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture);
+ Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult);
+
+ execute(args);
+ assertNormalExit();
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {29, -1})
+ public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordinatorEpoch) throws Exception {
+ TopicPartition topicPartition = new TopicPartition("foo", 5);
+ long producerId = 12345L;
+ short producerEpoch = 15;
+
+ String[] args = new String[] {
+ "--bootstrap-server",
+ "localhost:9092",
+ "abort",
+ "--topic",
+ topicPartition.topic(),
+ "--partition",
+ String.valueOf(topicPartition.partition()),
+ "--producer-id",
+ String.valueOf(producerId),
+ "--producer-epoch",
+ String.valueOf(producerEpoch),
+ "--coordinator-epoch",
+ String.valueOf(coordinatorEpoch)
+ };
+
+ AbortTransactionResult abortTransactionResult = Mockito.mock(AbortTransactionResult.class);
+ KafkaFuture<Void> abortFuture = KafkaFutureImpl.completedFuture(null);
+
+ final int expectedCoordinatorEpoch;
+ if (coordinatorEpoch < 0) {
+ expectedCoordinatorEpoch = 0;
+ } else {
+ expectedCoordinatorEpoch = coordinatorEpoch;
+ }
+
+ AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(
+ topicPartition, producerId, producerEpoch, expectedCoordinatorEpoch);
+
+ Mockito.when(abortTransactionResult.all()).thenReturn(abortFuture);
+ Mockito.when(admin.abortTransaction(expectedAbortSpec)).thenReturn(abortTransactionResult);
+
+ execute(args);
+ assertNormalExit();
+ }
+
+ private void execute(String[] args) throws Exception {
+ TransactionsCommand.execute(args, ns -> admin, out, time);
+ }
+
+ private List<List<String>> readOutputAsTable() throws IOException {
+ List<List<String>> table = new ArrayList<>();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
+
+ while (true) {
+ List<String> row = readRow(reader);
+ if (row == null) {
+ break;
+ }
+ table.add(row);
+ }
+ return table;
+ }
+
+ private List<String> readRow(BufferedReader reader) throws IOException {
+ String line = reader.readLine();
+ if (line == null) {
+ return null;
+ } else {
+ return asList(line.split("\\s+"));
+ }
+ }
+
+ private void assertNormalExit() {
+ assertTrue(exitProcedure.hasExited);
+ assertEquals(0, exitProcedure.statusCode);
+ }
+
+ private void assertCommandFailure(String[] args) throws Exception {
+ execute(args);
+ assertTrue(exitProcedure.hasExited);
+ assertEquals(1, exitProcedure.statusCode);
+ }
+
+ private static class MockExitProcedure implements Exit.Procedure {
+ private boolean hasExited = false;
+ private int statusCode;
+
+ @Override
+ public void execute(int statusCode, String message) {
+ if (!this.hasExited) {
+ this.hasExited = true;
+ this.statusCode = statusCode;
+ }
+ }
+ }
+
+}