You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2020/11/06 08:29:34 UTC
[kafka] branch trunk updated: KAFKA-10673: Cache inter broker
listener name used in connection quotas (#9555)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c2cc8a6 KAFKA-10673: Cache inter broker listener name used in connection quotas (#9555)
c2cc8a6 is described below
commit c2cc8a6b4316338075d163f277fe0d6825172a36
Author: David Mao <47...@users.noreply.github.com>
AuthorDate: Fri Nov 6 02:28:31 2020 -0600
KAFKA-10673: Cache inter broker listener name used in connection quotas (#9555)
`config.interBrokerListenerName` and `config.listeners` were called several times per connection accept. As these calls are expensive, it is preferable to rely on cached values.
Reviewers: Anna Povzner <an...@confluent.io>, David Jacot <dj...@confluent.io>
---
core/src/main/scala/kafka/network/SocketServer.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 17b6eae..dcfc08d 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1278,6 +1278,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
@volatile private var brokerMaxConnections = config.maxConnections
+ private val interBrokerListenerName = config.interBrokerListenerName
private val counts = mutable.Map[InetAddress, Int]()
// Listener counts and configs are synchronized on `counts`
@@ -1415,7 +1416,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
}
private def protectedListener(listenerName: ListenerName): Boolean =
- config.interBrokerListenerName == listenerName && config.listeners.size > 1
+ interBrokerListenerName == listenerName && listenerCounts.size > 1
private def maxListenerConnections(listenerName: ListenerName): Int =
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)