You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/06/28 17:38:09 UTC

[kafka] branch 2.8 updated: KAFKA-12976; Remove UNSUPPORTED_VERSION error from delete topics call (#10923)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 379ea45  KAFKA-12976; Remove UNSUPPORTED_VERSION error from delete topics call (#10923)
379ea45 is described below

commit 379ea4561ac64e28f96c1e3efd5df5ef0343737c
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Mon Jun 28 10:15:50 2021 -0700

    KAFKA-12976; Remove UNSUPPORTED_VERSION error from delete topics call (#10923)
    
    Removed the condition to throw the error. Now we return UNKNOWN_TOPIC_ID which allows clients to retry instead of failing. Updated the test for IBP < 2.8 that tries to delete topics using ID.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/server/KafkaApis.scala                     | 5 +----
 .../unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala    | 2 +-
 2 files changed, 2 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index e3b4947..124c5a9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1892,10 +1892,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         results.asScala.filter(result => result.name() != null))(_.name)
       results.forEach { topic =>
         val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
-        if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
-          topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
-          topic.setErrorMessage("Topic IDs are not supported on the server.")
-        } else if (unresolvedTopicId) {
+        if (unresolvedTopicId) {
           topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
         } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) {
 
diff --git a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
index e8f87bb..94fc699 100644
--- a/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
+++ b/core/src/test/scala/unit/kafka/server/TopicIdWithOldInterBrokerProtocolTest.scala
@@ -94,7 +94,7 @@ class TopicIdWithOldInterBrokerProtocolTest extends BaseRequestTest {
         )).setTimeoutMs(timeout)).build()
     val response = sendDeleteTopicsRequest(request)
     val error = response.errorCounts.asScala
-    assertEquals(2, error(Errors.UNSUPPORTED_VERSION))
+    assertEquals(2, error(Errors.UNKNOWN_TOPIC_ID))
   }
 
   private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer]): MetadataResponse = {