You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/04 04:46:12 UTC

[GitHub] [kafka] splett2 opened a new pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

splett2 opened a new pull request #9555:
URL: https://github.com/apache/kafka/pull/9555


   A profile from a moderately busy cluster shows that calls to `protectedListener` can make up more than 1% of the allocations in a cluster.
   
   `config.interBrokerListenerName` is an expensive call that both makes a copy of the `KafkaConfig`'s backing map and performs string/regex parsing. Given that we call `protectedListener()` multiple times per call to `ConnectionQuotas.inc()`, we end up performing a lot of unnecessary allocations, particularly given that the inter broker listener changes very infrequently. We can instead cache the inter broker listener name, and only update it when listeners are removed/added.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #9555:
URL: https://github.com/apache/kafka/pull/9555


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] splett2 commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

Posted by GitBox <gi...@apache.org>.
splett2 commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518283600



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = config.interBrokerListenerName

Review comment:
       `interBrokerListenerName` is apparently not a dynamic config.
   
   see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
       // Verify updating inter-broker listener
       val props = new Properties
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
       try {
         reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
         fail("Inter-broker listener cannot be dynamically updated")
      }
   ```
   I don't think we allow updating inter broker listener at all, so I think we can remove the test I added. I actually wasn't sure if we allowed it or not, but the code seems to suggest otherwise.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = config.interBrokerListenerName

Review comment:
       `interBrokerListenerName` is apparently not a dynamic config.
   
   see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
       // Verify updating inter-broker listener
       val props = new Properties
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
       try {
         reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
         fail("Inter-broker listener cannot be dynamically updated")
      }
   ```
   I don't think we allow updating inter broker listener at all, so I think we can remove the test I added. I actually wasn't sure if we allowed it or not, but the code seems to suggest that it isn't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518269240



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = config.interBrokerListenerName

Review comment:
       Good point and definitely agree that `config.interBrokerListenerName` could be expensive as we call it several times per accepting a new connection. 
   
   The issue here is that interBrokerListenerName is a dynamic config. So, you will need to update the cached value on changes to that config; similar how we notify ConnectionQuotas about config changes from SocketServer.reconfigure(). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] splett2 commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

Posted by GitBox <gi...@apache.org>.
splett2 commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518283600



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = config.interBrokerListenerName

Review comment:
       `interBrokerListenerName` is apparently not a dynamic config.
   
   `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
       // Verify updating inter-broker listener
       val props = new Properties
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
       try {
         reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
         fail("Inter-broker listener cannot be dynamically updated")
      }
   ```
   It does seem like we allow updating the interbroker listener when adding/removing listeners, but that case is covered.
   I will try to verify we allow updating the interbroker listener name through the inter broker security protocol, since apparently that's another way to configure the listener.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = config.interBrokerListenerName

Review comment:
       `interBrokerListenerName` is apparently not a dynamic config.
   
   see `DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate`
   ...
   ```
       // Verify updating inter-broker listener
       val props = new Properties
       props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
       try {
         reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
         fail("Inter-broker listener cannot be dynamically updated")
      }
   ```
   It does seem like we allow updating the interbroker listener when adding/removing listeners, but that case is covered.
   I will try to verify we allow updating the interbroker listener name through the inter broker security protocol, since apparently that's another way to configure the listener.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #9555: KAFKA-10673: Cache inter broker listener name used in connection quotas

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #9555:
URL: https://github.com/apache/kafka/pull/9555#discussion_r518327928



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1189,6 +1189,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   @volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
   @volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
   @volatile private var brokerMaxConnections = config.maxConnections
+  @volatile private var interBrokerListenerName = config.interBrokerListenerName

Review comment:
       You are right -- I saw that test and thought it was a dynamic config, but the test was verifying that it cannot be updated. I see now that KIP-226 lists that as future work, cool.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org