You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/03/22 08:32:55 UTC

[kafka] branch trunk updated: MINOR: Avoid unnecessary collection copy in MetadataCache (#6397)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1deb072   MINOR: Avoid unnecessary collection copy in MetadataCache (#6397)
1deb072 is described below

commit 1deb072f56ac07b3233089ccaab8271d75f483c5
Author: Radai Rosenblatt <ra...@gmail.com>
AuthorDate: Fri Mar 22 01:32:45 2019 -0700

     MINOR: Avoid unnecessary collection copy in MetadataCache (#6397)
    
    `map` was being used to convert `Iterable[Integer]` to `Iterable[Int`]. That
    operation represented 11% of total CPU time measured under load for us.
    We also expect a positive impact on GC.
    
    Reviewers: Joel Koshy <jj...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/server/MetadataCache.scala | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index ec5a2b9..647ccb1 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -51,9 +51,10 @@ class MetadataCache(brokerId: Int) extends Logging {
   private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
 
   // This method is the main hotspot when it comes to the performance of metadata requests,
-  // we should be careful about adding additional logic here.
+  // we should be careful about adding additional logic here. Relatedly, `brokers` is
+  // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
   // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
+  private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
     val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
     brokers.foreach { brokerId =>
       val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
@@ -76,9 +77,9 @@ class MetadataCache(brokerId: Int) extends Logging {
         val leaderBrokerId = partitionState.basePartitionState.leader
         val leaderEpoch = partitionState.basePartitionState.leaderEpoch
         val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName)
-        val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
+        val replicas = partitionState.basePartitionState.replicas.asScala
         val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
-        val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
+        val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
@@ -94,7 +95,7 @@ class MetadataCache(brokerId: Int) extends Logging {
               offlineReplicaInfo.asJava)
 
           case Some(leader) =>
-            val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt)
+            val isr = partitionState.basePartitionState.isr.asScala
             val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
 
             if (replicaInfo.size < replicas.size) {