You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/09/10 22:12:17 UTC

[kafka] branch 3.0 updated: KAFKA-13288; Include internal topics when searching hanging transactions (#11319)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 901725f  KAFKA-13288; Include internal topics when searching hanging transactions (#11319)
901725f is described below

commit 901725fbe6f01d73c2efcad5e9bace2e473d4f6e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri Sep 10 14:33:37 2021 -0700

    KAFKA-13288; Include internal topics when searching hanging transactions (#11319)
    
    This patch ensures that internal topics are included when searching for hanging transactions with the `--broker-id` argument in `kafka-transactions.sh`.
    
    Reviewers: David Jacot <dj...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/clients/admin/ListTopicsOptions.java     | 22 ++++++++++++++++++++++
 .../apache/kafka/tools/TransactionsCommand.java    |  4 +++-
 .../kafka/tools/TransactionsCommandTest.java       |  4 +++-
 3 files changed, 28 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
index 5a8f2b4..4ffa66d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
+import java.util.Objects;
+
 /**
  * Options for {@link Admin#listTopics()}.
  *
@@ -58,4 +60,24 @@ public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {
     public boolean shouldListInternal() {
         return listInternal;
     }
+
+    @Override
+    public String toString() {
+        return "ListTopicsOptions(" +
+            "listInternal=" + listInternal +
+            ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ListTopicsOptions that = (ListTopicsOptions) o;
+        return listInternal == that.listInternal;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(listInternal);
+    }
 }
diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index b8f1c9d..45e8c71 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.DescribeProducersOptions;
 import org.apache.kafka.clients.admin.DescribeProducersResult;
 import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.ListTransactionsOptions;
 import org.apache.kafka.clients.admin.ProducerState;
 import org.apache.kafka.clients.admin.TopicDescription;
@@ -719,7 +720,8 @@ public abstract class TransactionsCommand {
             Admin admin
         ) throws Exception {
             try {
-                return new ArrayList<>(admin.listTopics().names().get());
+                ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
+                return new ArrayList<>(admin.listTopics(listOptions).names().get());
             } catch (ExecutionException e) {
                 printErrorAndExit("Failed to list topics", e.getCause());
                 return Collections.emptyList();
diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
index 968c34a..d5c6c45 100644
--- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.DescribeProducersResult;
 import org.apache.kafka.clients.admin.DescribeProducersResult.PartitionProducerState;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.DescribeTransactionsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.ListTransactionsOptions;
 import org.apache.kafka.clients.admin.ListTransactionsResult;
@@ -533,7 +534,8 @@ public class TransactionsCommandTest {
     ) {
         ListTopicsResult result = Mockito.mock(ListTopicsResult.class);
         Mockito.when(result.names()).thenReturn(completedFuture(topics));
-        Mockito.when(admin.listTopics()).thenReturn(result);
+        ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
+        Mockito.when(admin.listTopics(listOptions)).thenReturn(result);
     }
 
     private void expectDescribeTopics(