You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/03/09 06:33:24 UTC
[kafka] branch trunk updated: MINOR: Clean up AlterIsrManager code (#11832)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 69926b5 MINOR: Clean up AlterIsrManager code (#11832)
69926b5 is described below
commit 69926b5193d1b4837039c8a738f3dc9c5bbd267b
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Mar 9 07:31:07 2022 +0100
MINOR: Clean up AlterIsrManager code (#11832)
Reviewers: Justine Olshan <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/server/AlterIsrManager.scala | 38 ++++++++++------------
1 file changed, 18 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index b8507d0..64441ad 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -27,7 +27,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.OperationNotAttemptedException
-import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData}
+import org.apache.kafka.common.message.AlterIsrRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
@@ -154,7 +154,7 @@ class DefaultAlterIsrManager(
if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
// Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
- unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
+ unsentIsrUpdates.values.forEach(item => inflightAlterIsrItems.append(item))
sendRequest(inflightAlterIsrItems.toSeq)
}
}
@@ -215,44 +215,42 @@ class DefaultAlterIsrManager(
val message = new AlterIsrRequestData()
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpochSupplier.apply())
- .setTopics(new util.ArrayList())
-
- inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => {
- val topicPart = new AlterIsrRequestData.TopicData()
- .setName(entry._1)
- .setPartitions(new util.ArrayList())
- message.topics().add(topicPart)
- entry._2.foreach(item => {
- topicPart.partitions().add(new AlterIsrRequestData.PartitionData()
+
+ inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) =>
+ val topicData = new AlterIsrRequestData.TopicData().setName(topic)
+ message.topics.add(topicData)
+ items.foreach { item =>
+ topicData.partitions.add(new AlterIsrRequestData.PartitionData()
.setPartitionIndex(item.topicPartition.partition)
.setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
.setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
.setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
)
- })
- })
+ }
+ }
message
}
def handleAlterIsrResponse(alterIsrResponse: AlterIsrResponse,
sentBrokerEpoch: Long,
inflightAlterIsrItems: Seq[AlterIsrItem]): Errors = {
- val data: AlterIsrResponseData = alterIsrResponse.data
+ val data = alterIsrResponse.data
Errors.forCode(data.errorCode) match {
case Errors.STALE_BROKER_EPOCH =>
warn(s"Broker had a stale broker epoch ($sentBrokerEpoch), retrying.")
+
case Errors.CLUSTER_AUTHORIZATION_FAILED =>
error(s"Broker is not authorized to send AlterIsr to controller",
Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is not authorized to send AlterIsr to controller"))
+
case Errors.NONE =>
// Collect partition-level responses to pass to the callbacks
- val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
- new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+ val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
data.topics.forEach { topic =>
- topic.partitions().forEach(partition => {
+ topic.partitions.forEach { partition =>
val tp = new TopicPartition(topic.name, partition.partitionIndex)
- val error = Errors.forCode(partition.errorCode())
+ val error = Errors.forCode(partition.errorCode)
debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
if (error == Errors.NONE) {
val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
@@ -261,7 +259,7 @@ class DefaultAlterIsrManager(
} else {
partitionResponses(tp) = Left(error)
}
- })
+ }
}
// Iterate across the items we sent rather than what we received to ensure we run the callback even if a
@@ -285,7 +283,7 @@ class DefaultAlterIsrManager(
}
}
- case e: Errors =>
+ case e =>
warn(s"Controller returned an unexpected top-level error when handling AlterIsr request: $e")
}