You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/21 16:24:42 UTC

[kafka] branch trunk updated: MINOR: Fix AlterPartitionManager topic id handling in response handler (#12317)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3072b3d23e MINOR: Fix AlterPartitionManager topic id handling in response handler (#12317)
3072b3d23e is described below

commit 3072b3d23e43f3bc3478eca1087312edb620dc6b
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Jun 21 18:24:33 2022 +0200

    MINOR: Fix AlterPartitionManager topic id handling in response handler (#12317)
    
    https://github.com/apache/kafka/commit/f83d95d9a28267f7ef7a7b1e584dcdb4aa842210 introduced topic ids in the AlterPartitionRequest/Response and we just found a bug in the request handling logic. The issue is the following.
    
    When the `AlterPartitionManager` receives the response, it builds the `partitionResponses` mapping `TopicIdPartition` to its result. `TopicIdPartition` is built from the response. Therefore if version < 2 is used, `TopicIdPartition` will have the `ZERO` topic id. Then the `AlterPartitionManager` iterates over the item sent to find their response. If an item has a topic id in its `TopicIdPartition` and version < 2 was used, it cannot find it because one has it and the other one has not.
    
    This patch fixes the issue by using `TopicPartition` as a key in the `partitionResponses` map. This ensures that the result can be found regardless of the topic id being set or not.
    
    Note that the case where version 2 is used is handled correctly because we already have logic to get back the topic name from the topic id in order to construct the `TopicPartition`.
    
    `testPartialTopicIds` test was supposed to catch this but it didn't due to the ignorable topic id field being present. This patch fixes the test as well.
    
    Reviewers: Kvicii <42...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/AlterPartitionManager.scala   |  6 +++---
 .../kafka/server/AlterPartitionManagerTest.scala     | 20 +++++++++++++++-----
 2 files changed, 18 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index d791e0a0dd..9f89f47e82 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -318,7 +318,7 @@ class DefaultAlterPartitionManager(
 
       case Errors.NONE =>
         // Collect partition-level responses to pass to the callbacks
-        val partitionResponses = new mutable.HashMap[TopicIdPartition, Either[Errors, LeaderAndIsr]]()
+        val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
           // Topic IDs are used since version 2 of the AlterPartition API.
           val topicName = if (requestHeader.apiVersion > 1) topicNamesByIds.get(topic.topicId).orNull else topic.topicName
@@ -326,7 +326,7 @@ class DefaultAlterPartitionManager(
             error(s"Received an unexpected topic $topic in the alter partition response, ignoring it.")
           } else {
             topic.partitions.forEach { partition =>
-              val tp = new TopicIdPartition(topic.topicId, partition.partitionIndex, topicName)
+              val tp = new TopicPartition(topicName, partition.partitionIndex)
               val apiError = Errors.forCode(partition.errorCode)
               debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
               if (apiError == Errors.NONE) {
@@ -357,7 +357,7 @@ class DefaultAlterPartitionManager(
         // partition was somehow erroneously excluded from the response. Note that these callbacks are run from
         // the leaderIsrUpdateLock write lock in Partition#sendAlterPartitionRequest
         inflightAlterPartitionItems.foreach { inflightAlterPartition =>
-          partitionResponses.get(inflightAlterPartition.topicIdPartition) match {
+          partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {
             case Some(leaderAndIsrOrError) =>
               try {
                 leaderAndIsrOrError match {
diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index c35bdc0193..af177961d3 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -28,8 +28,8 @@ import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.errors.{AuthenticationException, InvalidUpdateVersionException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException}
 import org.apache.kafka.common.message.AlterPartitionResponseData
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.MessageUtil
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.AbstractResponse
 import org.apache.kafka.common.requests.RequestHeader
 import org.apache.kafka.common.requests.{AbstractRequest, AlterPartitionRequest, AlterPartitionResponse}
 import org.apache.kafka.metadata.LeaderRecoveryState
@@ -491,12 +491,22 @@ class AlterPartitionManagerTest {
   }
 
   private def makeClientResponse(
-    response: AbstractResponse,
+    response: AlterPartitionResponse,
     version: Short
   ): ClientResponse = {
-    val requestHeader = new RequestHeader(response.apiKey, version, "", 0)
-    new ClientResponse(requestHeader, null, "", 0L, 0L,
-      false, null, null, response)
+    new ClientResponse(
+      new RequestHeader(response.apiKey, version, "", 0),
+      null,
+      "",
+      0L,
+      0L,
+      false,
+      null,
+      null,
+      // Response is serialized and deserialized to ensure that its does
+      // not contain ignorable fields used by other versions.
+      AlterPartitionResponse.parse(MessageUtil.toByteBuffer(response.data, version), version)
+    )
   }
 
   private def makeAlterPartition(