You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/03/22 08:32:55 UTC
[kafka] branch trunk updated: MINOR: Avoid unnecessary collection
copy in MetadataCache (#6397)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1deb072 MINOR: Avoid unnecessary collection copy in MetadataCache (#6397)
1deb072 is described below
commit 1deb072f56ac07b3233089ccaab8271d75f483c5
Author: Radai Rosenblatt <ra...@gmail.com>
AuthorDate: Fri Mar 22 01:32:45 2019 -0700
MINOR: Avoid unnecessary collection copy in MetadataCache (#6397)
`map` was being used to convert `Iterable[Integer]` to `Iterable[Int`]. That
operation represented 11% of total CPU time measured under load for us.
We also expect a positive impact on GC.
Reviewers: Joel Koshy <jj...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
core/src/main/scala/kafka/server/MetadataCache.scala | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index ec5a2b9..647ccb1 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -51,9 +51,10 @@ class MetadataCache(brokerId: Int) extends Logging {
private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
// This method is the main hotspot when it comes to the performance of metadata requests,
- // we should be careful about adding additional logic here.
+ // we should be careful about adding additional logic here. Relatedly, `brokers` is
+ // `Iterable[Integer]` instead of `Iterable[Int]` to avoid a collection copy.
// filterUnavailableEndpoints exists to support v0 MetadataResponses
- private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
+ private def getEndpoints(snapshot: MetadataSnapshot, brokers: Iterable[java.lang.Integer], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = {
val result = new mutable.ArrayBuffer[Node](math.min(snapshot.aliveBrokers.size, brokers.size))
brokers.foreach { brokerId =>
val endpoint = getAliveEndpoint(snapshot, brokerId, listenerName) match {
@@ -76,9 +77,9 @@ class MetadataCache(brokerId: Int) extends Logging {
val leaderBrokerId = partitionState.basePartitionState.leader
val leaderEpoch = partitionState.basePartitionState.leaderEpoch
val maybeLeader = getAliveEndpoint(snapshot, leaderBrokerId, listenerName)
- val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt)
+ val replicas = partitionState.basePartitionState.replicas.asScala
val replicaInfo = getEndpoints(snapshot, replicas, listenerName, errorUnavailableEndpoints)
- val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
+ val offlineReplicaInfo = getEndpoints(snapshot, partitionState.offlineReplicas.asScala, listenerName, errorUnavailableEndpoints)
maybeLeader match {
case None =>
@@ -94,7 +95,7 @@ class MetadataCache(brokerId: Int) extends Logging {
offlineReplicaInfo.asJava)
case Some(leader) =>
- val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt)
+ val isr = partitionState.basePartitionState.isr.asScala
val isrInfo = getEndpoints(snapshot, isr, listenerName, errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {