You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/09/10 22:12:17 UTC
[kafka] branch 3.0 updated: KAFKA-13288;
Include internal topics when searching hanging transactions (#11319)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 901725f KAFKA-13288; Include internal topics when searching hanging transactions (#11319)
901725f is described below
commit 901725fbe6f01d73c2efcad5e9bace2e473d4f6e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Sep 10 14:33:37 2021 -0700
KAFKA-13288; Include internal topics when searching hanging transactions (#11319)
This patch ensures that internal topics are included when searching for hanging transactions with the `--broker-id` argument in `kafka-transactions.sh`.
Reviewers: David Jacot <dj...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/clients/admin/ListTopicsOptions.java | 22 ++++++++++++++++++++++
.../apache/kafka/tools/TransactionsCommand.java | 4 +++-
.../kafka/tools/TransactionsCommandTest.java | 4 +++-
3 files changed, 28 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 5a8f2b4..4ffa66d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Objects;
+
/**
* Options for {@link Admin#listTopics()}.
*
@@ -58,4 +60,24 @@ public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
public boolean shouldListInternal() {
return listInternal;
}
+
+ @Override
+ public String toString() {
+ return "ListTopicsOptions(" +
+ "listInternal=" + listInternal +
+ ')';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ListTopicsOptions that = (ListTopicsOptions) o;
+ return listInternal == that.listInternal;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(listInternal);
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index b8f1c9d..45e8c71 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
@@ -719,7 +720,8 @@ public abstract class TransactionsCommand {
Admin admin
) throws Exception {
try {
- return new ArrayList<>(admin.listTopics().names().get());
+ ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
+ return new ArrayList<>(admin.listTopics(listOptions).names().get());
} catch (ExecutionException e) {
printErrorAndExit("Failed to list topics", e.getCause());
return Collections.emptyList();
diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index 968c34a..d5c6c45 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
@@ -533,7 +534,8 @@ public class TransactionsCommandTest {
) {
ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
Mockito.when(result.names()).thenReturn(completedFuture(topics));
- Mockito.when(admin.listTopics()).thenReturn(result);
+ ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
+ Mockito.when(admin.listTopics(listOptions)).thenReturn(result);
}
private void expectDescribeTopics(