You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/03/09 06:33:24 UTC

[kafka] branch trunk updated: MINOR: Clean up AlterIsrManager code (#11832)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 69926b5  MINOR: Clean up AlterIsrManager code (#11832)
69926b5 is described below

commit 69926b5193d1b4837039c8a738f3dc9c5bbd267b
Author: David Jacot <dj...@confluent.io>
AuthorDate: Wed Mar 9 07:31:07 2022 +0100

    MINOR: Clean up AlterIsrManager code (#11832)
    
    Reviewers: Justine Olshan <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/AlterIsrManager.scala  | 38 ++++++++++------------
 1 file changed, 18 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index b8507d0..64441ad 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -27,7 +27,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.OperationNotAttemptedException
-import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData}
+import org.apache.kafka.common.message.AlterIsrRequestData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
@@ -154,7 +154,7 @@ class DefaultAlterIsrManager(
     if (!unsentIsrUpdates.isEmpty && inflightRequest.compareAndSet(false, true)) {
       // Copy current unsent ISRs but don't remove from the map, they get cleared in the response handler
       val inflightAlterIsrItems = new ListBuffer[AlterIsrItem]()
-      unsentIsrUpdates.values().forEach(item => inflightAlterIsrItems.append(item))
+      unsentIsrUpdates.values.forEach(item => inflightAlterIsrItems.append(item))
       sendRequest(inflightAlterIsrItems.toSeq)
     }
   }
@@ -215,44 +215,42 @@ class DefaultAlterIsrManager(
     val message = new AlterIsrRequestData()
       .setBrokerId(brokerId)
       .setBrokerEpoch(brokerEpochSupplier.apply())
-      .setTopics(new util.ArrayList())
-
-    inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach(entry => {
-      val topicPart = new AlterIsrRequestData.TopicData()
-        .setName(entry._1)
-        .setPartitions(new util.ArrayList())
-      message.topics().add(topicPart)
-      entry._2.foreach(item => {
-        topicPart.partitions().add(new AlterIsrRequestData.PartitionData()
+
+    inflightAlterIsrItems.groupBy(_.topicPartition.topic).foreach { case (topic, items) =>
+      val topicData = new AlterIsrRequestData.TopicData().setName(topic)
+      message.topics.add(topicData)
+      items.foreach { item =>
+        topicData.partitions.add(new AlterIsrRequestData.PartitionData()
           .setPartitionIndex(item.topicPartition.partition)
           .setLeaderEpoch(item.leaderAndIsr.leaderEpoch)
           .setNewIsr(item.leaderAndIsr.isr.map(Integer.valueOf).asJava)
           .setCurrentIsrVersion(item.leaderAndIsr.zkVersion)
         )
-      })
-    })
+      }
+    }
     message
   }
 
   def handleAlterIsrResponse(alterIsrResponse: AlterIsrResponse,
                              sentBrokerEpoch: Long,
                              inflightAlterIsrItems: Seq[AlterIsrItem]): Errors = {
-    val data: AlterIsrResponseData = alterIsrResponse.data
+    val data = alterIsrResponse.data
 
     Errors.forCode(data.errorCode) match {
       case Errors.STALE_BROKER_EPOCH =>
         warn(s"Broker had a stale broker epoch ($sentBrokerEpoch), retrying.")
+
       case Errors.CLUSTER_AUTHORIZATION_FAILED =>
         error(s"Broker is not authorized to send AlterIsr to controller",
           Errors.CLUSTER_AUTHORIZATION_FAILED.exception("Broker is not authorized to send AlterIsr to controller"))
+
       case Errors.NONE =>
         // Collect partition-level responses to pass to the callbacks
-        val partitionResponses: mutable.Map[TopicPartition, Either[Errors, LeaderAndIsr]] =
-          new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
+        val partitionResponses = new mutable.HashMap[TopicPartition, Either[Errors, LeaderAndIsr]]()
         data.topics.forEach { topic =>
-          topic.partitions().forEach(partition => {
+          topic.partitions.forEach { partition =>
             val tp = new TopicPartition(topic.name, partition.partitionIndex)
-            val error = Errors.forCode(partition.errorCode())
+            val error = Errors.forCode(partition.errorCode)
             debug(s"Controller successfully handled AlterIsr request for $tp: $partition")
             if (error == Errors.NONE) {
               val newLeaderAndIsr = new LeaderAndIsr(partition.leaderId, partition.leaderEpoch,
@@ -261,7 +259,7 @@ class DefaultAlterIsrManager(
             } else {
               partitionResponses(tp) = Left(error)
             }
-          })
+          }
         }
 
         // Iterate across the items we sent rather than what we received to ensure we run the callback even if a
@@ -285,7 +283,7 @@ class DefaultAlterIsrManager(
           }
         }
 
-      case e: Errors =>
+      case e =>
         warn(s"Controller returned an unexpected top-level error when handling AlterIsr request: $e")
     }