You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2022/04/08 19:54:32 UTC

[kafka] branch trunk updated: MINOR: Fix DescribeLogDirs API error handling for older API versions (#12017)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a5f0cfaef MINOR: Fix DescribeLogDirs API error handling for older API versions (#12017)
7a5f0cfaef is described below

commit 7a5f0cfaefb57cefa70c81d63c60316075c6dd97
Author: Alok Nikhil <an...@confluent.io>
AuthorDate: Fri Apr 8 12:54:09 2022 -0700

    MINOR: Fix DescribeLogDirs API error handling for older API versions (#12017)
    
    With KAFKA-13527 / KIP-784 we introduced a new top-level error code for
    the DescribeLogDirs API for versions 3 and above. However, the change
    regressed the error handling for versions less than 3 since the response
    converter fails to write the non-zero error code out (rightly) for
    versions lower than 3 and drops the response to the client which
    eventually times out instead of receiving an empty log dirs response and
    processing that as a Cluster Auth failure.
    
    With this change, the API conditionally propagates the error code out to
    the client if the request API version is 3 and above. This keeps the
    semantics of the error handling the same for all versions and restores
    the behavior for older versions.
    
    See current behavior in the broker log:
    ```bash
    ERROR] 2022-04-08 01:22:56,406 [data-plane-kafka-request-handler-10] kafka.server.KafkaApis - [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=DESCRIBE_LOG_DIRS, apiVersion=0, clientId=sarama, correlationId=1) -- DescribeLogDirsRequestData(topics=null)
    org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
    [ERROR] 2022-04-08 01:22:56,407 [data-plane-kafka-request-handler-10] kafka.server.KafkaRequestHandler - [Kafka Request Handler 10 on Broker 0], Exception when handling request
    org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default errorCode at version 0
    ```
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../common/message/DescribeLogDirsResponse.json    |  3 ++-
 .../kafka/network/RequestConvertToJsonTest.scala   | 30 ++++++++++++++++++++--
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
index 0171a16481..c79e756aad 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
@@ -25,7 +25,8 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "3+", "about": "The error code, or 0 if there was no error." },
+    { "name": "ErrorCode", "type": "int16", "versions": "3+",
+      "ignorable": true, "about": "The error code, or 0 if there was no error." },
     { "name": "Results", "type": "[]DescribeLogDirsResult", "versions": "0+",
       "about": "The log directories.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
diff --git a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
index 09dceac151..0ce8448a4f 100644
--- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
@@ -19,14 +19,13 @@ package kafka.network
 
 import java.net.InetAddress
 import java.nio.ByteBuffer
-
 import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, JsonNodeFactory, LongNode, ObjectNode, TextNode}
 import kafka.network
 import kafka.network.RequestConvertToJson.requestHeaderNode
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.network.{ClientInformation, ListenerName, NetworkSend}
-import org.apache.kafka.common.protocol.{ApiKeys, MessageUtil}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -61,6 +60,33 @@ class RequestConvertToJsonTest {
     assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys")
   }
 
+  @Test
+  def testAllApiVersionsResponseHandled(): Unit = {
+
+    ApiKeys.values().foreach { key => {
+      val unhandledVersions = ArrayBuffer[java.lang.Short]()
+      key.allVersions().forEach { version => {
+        val message = key match {
+          // Specify top-level error handling for verifying compatibility across versions
+          case ApiKeys.DESCRIBE_LOG_DIRS =>
+            ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData]
+              .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
+          case _ =>
+            ApiMessageType.fromApiKey(key.id).newResponse()
+        }
+
+        val bytes = MessageUtil.toByteBuffer(message, version)
+        val response = AbstractResponse.parseResponse(key, bytes, version)
+        try {
+          RequestConvertToJson.response(response, version)
+        } catch {
+          case _ : IllegalStateException => unhandledVersions += version
+        }}
+      }
+      assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: ${key.toString} - Unhandled request versions")
+    }}
+  }
+
   @Test
   def testAllResponseTypesHandled(): Unit = {
     val unhandledKeys = ArrayBuffer[String]()