You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/21 16:24:42 UTC
[kafka] branch trunk updated: MINOR: Fix AlterPartitionManager topic id handling in response handler (#12317)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3072b3d23e MINOR: Fix AlterPartitionManager topic id handling in response handler (#12317)
3072b3d23e is described below
commit 3072b3d23e43f3bc3478eca1087312edb620dc6b
Author: David Jacot <dj...@confluent.io>
AuthorDate: Tue Jun 21 18:24:33 2022 +0200
MINOR: Fix AlterPartitionManager topic id handling in response handler (#12317)
https://github.com/apache/kafka/commit/f83d95d9a28267f7ef7a7b1e584dcdb4aa842210 introduced topic ids in the AlterPartitionRequest/Response and we just found a bug in the request handling logic. The issue is the following.
When the `AlterPartitionManager` receives the response, it builds the `partitionResponses` mapping `TopicIdPartition` to its result. `TopicIdPartition` is built from the response. Therefore if version < 2 is used, `TopicIdPartition` will have the `ZERO` topic id. Then the `AlterPartitionManager` iterates over the item sent to find their response. If an item has a topic id in its `TopicIdPartition` and version < 2 was used, it cannot find it because one has it and the other one has not.
This patch fixes the issue by using `TopicPartition` as a key in the `partitionResponses` map. This ensures that the result can be found regardless of the topic id being set or not.
Note that the case where version 2 is used is handled correctly because we already have logic to get back the topic name from the topic id in order to construct the `TopicPartition`.
`testPartialTopicIds` test was supposed to catch this but it didn't due to the ignorable topic id field being present. This patch fixes the test as well.
Reviewers: Kvicii <42...@users.noreply.github.com>, Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/server/AlterPartitionManager.scala | 6 +++---
.../kafka/server/AlterPartitionManagerTest.scala | 20 +++++++++++++++-----
2 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index d791e0a0dd..9f89f47e82 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -318,7 +318,7 @@ class DefaultAlterPartitionManager(
case Errors.NONE =>
// Collect partition-level responses to pass to the callbacks
- val partitionResponses = new mutable.HashMap[TopicIdPartition, Either[Errors, LeaderAndIsr]]()
+ val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
data.topics.forEach { topic =>
// Topic IDs are used since version 2 of the AlterPartition API.
val topicName = if (requestHeader.apiVersion > 1) topicNamesByIds.get(topic.topicId).orNull else topic.topicName
@@ -326,7 +326,7 @@ class DefaultAlterPartitionManager(
error(s"Received an unexpected topic $topic in the alter partition response, ignoring it.")
} else {
topic.partitions.forEach { partition =>
- val tp = new TopicIdPartition(topic.topicId, partition.partitionIndex, topicName)
+ val tp = new TopicPartition(topicName, partition.partitionIndex)
val apiError = Errors.forCode(partition.errorCode)
debug(s"Controller successfully handled AlterPartition request for $tp: $partition")
if (apiError == Errors.NONE) {
@@ -357,7 +357,7 @@ class DefaultAlterPartitionManager(
// partition was somehow erroneously excluded from the response. Note that these callbacks are run from
// the leaderIsrUpdateLock write lock in Partition#sendAlterPartitionRequest
inflightAlterPartitionItems.foreach { inflightAlterPartition =>
- partitionResponses.get(inflightAlterPartition.topicIdPartition) match {
+ partitionResponses.get(inflightAlterPartition.topicIdPartition.topicPartition) match {
case Some(leaderAndIsrOrError) =>
try {
leaderAndIsrOrError match {
diff --git a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
index c35bdc0193..af177961d3 100644
--- a/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterPartitionManagerTest.scala
@@ -28,8 +28,8 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.errors.{AuthenticationException, InvalidUpdateVersionException, OperationNotAttemptedException, UnknownServerException, UnsupportedVersionException}
import org.apache.kafka.common.message.AlterPartitionResponseData
import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.MessageUtil
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.AbstractResponse
import org.apache.kafka.common.requests.RequestHeader
import org.apache.kafka.common.requests.{AbstractRequest, AlterPartitionRequest, AlterPartitionResponse}
import org.apache.kafka.metadata.LeaderRecoveryState
@@ -491,12 +491,22 @@ class AlterPartitionManagerTest {
}
private def makeClientResponse(
- response: AbstractResponse,
+ response: AlterPartitionResponse,
version: Short
): ClientResponse = {
- val requestHeader = new RequestHeader(response.apiKey, version, "", 0)
- new ClientResponse(requestHeader, null, "", 0L, 0L,
- false, null, null, response)
+ new ClientResponse(
+ new RequestHeader(response.apiKey, version, "", 0),
+ null,
+ "",
+ 0L,
+ 0L,
+ false,
+ null,
+ null,
+ // Response is serialized and deserialized to ensure that its does
+ // not contain ignorable fields used by other versions.
+ AlterPartitionResponse.parse(MessageUtil.toByteBuffer(response.data, version), version)
+ )
}
private def makeAlterPartition(