You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/11/06 08:29:34 UTC

[kafka] branch trunk updated: KAFKA-10673: Cache inter broker listener name used in connection quotas (#9555)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c2cc8a6  KAFKA-10673: Cache inter broker listener name used in connection quotas (#9555)
c2cc8a6 is described below

commit c2cc8a6b4316338075d163f277fe0d6825172a36
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Fri Nov 6 02:28:31 2020 -0600

    KAFKA-10673: Cache inter broker listener name used in connection quotas (#9555)
    
    `config.interBrokerListenerName` and `config.listeners` were called several times per connection accept. As these calls are expensive, it is preferable to rely on cached values.
    
    Reviewers: Anna Povzner <an...@confluent.io>, David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/network/SocketServer.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 17b6eae..dcfc08d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1278,6 +1278,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  private val interBrokerListenerName = config.interBrokerListenerName
   private val counts = mutable.Map[InetAddress, Int]()
 
   // Listener counts and configs are synchronized on `counts`
@@ -1415,7 +1416,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   }
 
   private def protectedListener(listenerName: ListenerName): Boolean =
-    config.interBrokerListenerName == listenerName && config.listeners.size > 1
+    interBrokerListenerName == listenerName && listenerCounts.size > 1
 
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)