You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2024/01/04 01:56:25 UTC

(kafka) branch trunk updated: KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID (#14599)

This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 105db82956e KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID (#14599)
105db82956e is described below

commit 105db82956e8a2a22d0a16aaae57fa69a5aeeec4
Author: Michael Edgar <mi...@xlate.io>
AuthorDate: Wed Jan 3 20:56:17 2024 -0500

    KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID (#14599)
    
    Throw UnknownTopicIdException instead of InvalidTopicException when no name is found for the topic ID.
    
    Similar to #6124 for describeTopics using a topic name. MockAdminClient already makes use of UnknownTopicIdException for this case.
    
    Reviewers: Justine Olshan <jo...@confluent.io>, Ashwin Pankaj <ap...@confluent.io>
---
 .../org/apache/kafka/clients/admin/KafkaAdminClient.java |  3 ++-
 .../apache/kafka/clients/admin/KafkaAdminClientTest.java | 16 ++++++++--------
 .../kafka/api/PlaintextAdminIntegrationTest.scala        |  2 +-
 3 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f663d6efc60..85c82e25144 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -90,6 +90,7 @@ import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnacceptableCredentialException;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
@@ -2228,7 +2229,7 @@ public class KafkaAdminClient extends AdminClient {
 
                     String topicName = cluster.topicName(topicId);
                     if (topicName == null) {
-                        future.completeExceptionally(new InvalidTopicException("TopicId " + topicId + " not found."));
+                        future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found."));
                         continue;
                     }
                     Errors topicError = errors.get(topicId);
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 19bff2ea5e8..e5a06282ae3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -1093,7 +1093,7 @@ public class KafkaAdminClientTest {
             future = env.adminClient().deleteTopics(singletonList("myTopic"),
                 new DeleteTopicsOptions()).all();
             TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class);
-            
+
             // With topic IDs
             Uuid topicId = Uuid.randomUuid();
 
@@ -1119,7 +1119,7 @@ public class KafkaAdminClientTest {
             TestUtils.assertFutureError(future, UnknownTopicIdException.class);
         }
     }
-    
+
 
     @Test
     public void testDeleteTopicsPartialResponse() throws Exception {
@@ -1136,7 +1136,7 @@ public class KafkaAdminClientTest {
 
             result.topicNameValues().get("myTopic").get();
             TestUtils.assertFutureThrows(result.topicNameValues().get("myOtherTopic"), ApiException.class);
-            
+
             // With topic IDs
             Uuid topicId1 = Uuid.randomUuid();
             Uuid topicId2 = Uuid.randomUuid();
@@ -1182,12 +1182,12 @@ public class KafkaAdminClientTest {
             assertNull(result.topicNameValues().get("topic1").get());
             assertNull(result.topicNameValues().get("topic2").get());
             TestUtils.assertFutureThrows(result.topicNameValues().get("topic3"), TopicExistsException.class);
-            
+
             // With topic IDs
             Uuid topicId1 = Uuid.randomUuid();
             Uuid topicId2 = Uuid.randomUuid();
             Uuid topicId3 = Uuid.randomUuid();
-            
+
             env.kafkaClient().prepareResponse(
                     expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, topicId3),
                     prepareDeleteTopicsResponse(1000,
@@ -1257,7 +1257,7 @@ public class KafkaAdminClientTest {
                 ThrottlingQuotaExceededException.class);
             assertEquals(0, e.throttleTimeMs());
             TestUtils.assertFutureThrows(result.topicNameValues().get("topic3"), TopicExistsException.class);
-            
+
             // With topic IDs
             Uuid topicId1 = Uuid.randomUuid();
             Uuid topicId2 = Uuid.randomUuid();
@@ -2500,12 +2500,12 @@ public class KafkaAdminClientTest {
             try {
                 DescribeTopicsResult result = env.adminClient().describeTopics(
                         TopicCollection.ofTopicIds(singletonList(nonExistID)));
-                TestUtils.assertFutureError(result.allTopicIds(), InvalidTopicException.class);
+                TestUtils.assertFutureError(result.allTopicIds(), UnknownTopicIdException.class);
                 result.allTopicIds().get();
                 fail("describe with non-exist topic ID should throw exception");
             } catch (Exception e) {
                 assertEquals(
-                        String.format("org.apache.kafka.common.errors.InvalidTopicException: TopicId %s not found.", nonExistID),
+                        String.format("org.apache.kafka.common.errors.UnknownTopicIdException: TopicId %s not found.", nonExistID),
                         e.getMessage());
             }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 23fb314fa04..5b2dffb1a56 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -197,7 +197,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
 
     val results = client.describeTopics(TopicCollection.ofTopicIds(Seq(existingTopicId, nonExistingTopicId).asJava)).topicIdValues()
     assertEquals(existingTopicId, results.get(existingTopicId).get.topicId())
-    assertThrows(classOf[ExecutionException], () => results.get(nonExistingTopicId).get).getCause.isInstanceOf[UnknownTopicIdException]
+    assertFutureExceptionTypeEquals(results.get(nonExistingTopicId), classOf[UnknownTopicIdException])
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)