You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/01 00:02:35 UTC

[GitHub] [kafka] apovzner opened a new pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

apovzner opened a new pull request #8768:
URL: https://github.com/apache/kafka/pull/8768


   This PR implements the part of KIP-612 that adds broker configurations for broker-wide and per-listener connection creation rate limits and enforces these limits.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r453962326



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)

Review comment:
       others use single thread, but yes, makes sense just to create one upfront using `listeners.size`. Will do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674554549


   @apovzner DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners failed in all three PR builds, so probably related?
   ```
   15:55:45 kafka.server.DynamicBrokerReconfigurationTest > testAddRemoveSaslListeners FAILED
   15:55:45     org.scalatest.exceptions.TestFailedException: Processors not shutdown for removed listener
   15:55:45         at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
   15:55:45         at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
   15:55:45         at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
   15:55:45         at org.scalatest.Assertions.fail(Assertions.scala:1091)
   15:55:45         at org.scalatest.Assertions.fail$(Assertions.scala:1087)
   15:55:45         at org.scalatest.Assertions$.fail(Assertions.scala:1389)
   15:55:45         at kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:1178)
   15:55:45         at kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSaslListeners(DynamicBrokerReconfigurationTest.scala:1057)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r452507990



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1308,18 +1410,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")

Review comment:
       yes, good catch. Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r453965142



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)

Review comment:
       the test code has just one extra prop in addition to `brokerPropsWithDefaultConnectionLimits` -- somehow it is easier to read the current way. I will keep it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674767765


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r439786379



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+      throttleTimeMs
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))

Review comment:
       We already have a yammer Meter that reports connection creation rate, also tagged with listener and processor. This same meter tracks both connection creation rate and total. Here is connetion creation rate metric:
   kafka.server:type=socket-server-metrics,listener={listener_name},networkProcessor={#},name=connection-creation-rate
   
   I had to add a Rate metric here because it both tracks rate and quota, and allows to calculate throttle time the same way we do with client (request & bandwidth) quotas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r457390034



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
##########
@@ -97,5 +99,13 @@ public MetricConfig recordLevel(Sensor.RecordingLevel recordingLevel) {
         return this;
     }
 
+    public boolean skipReporting() {

Review comment:
       I asked the very same question: https://github.com/apache/kafka/pull/8768#discussion_r433705077. Basically, we already have the same metrics defined as part of the `SelectorMetrics` so the ones added here collides.
   
   Thinking a bit more about this, I wonder if we could use Sensors without adding them to the registry at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-661935552


   There is one test failure in `kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota` that is related to the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-661729947


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #8768:
URL: https://github.com/apache/kafka/pull/8768


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-671684203


   @rajinisivaram Thanks for your review.  I updated this PR to expose  'connection-accept-rate'  metrics, addressed the remaining comments, and the test should also be fixed.
   
   It seems like it would also make sense to add and expose throttle time metrics. What do you think? If so, I will have a separate PR to add throttle-time metrics and use Token Bucket for quota calculation that David added as part of KIP-599 that works better with bursty workloads. I will update the KIP accordingly and notify on voting thread.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674769912


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674533627


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r453130277



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -329,15 +613,46 @@ class ConnectionQuotasTest {
   }
 
   // this method must be called on a separate thread, because connectionQuotas.inc() may block
+  private def acceptConnectionsAndVerifyRate(connectionQuotas: ConnectionQuotas,
+                                             listenerDesc: ListenerDesc,
+                                             numConnections: Long,
+                                             timeIntervalMs: Long,
+                                             expectedRate: Int,
+                                             epsilon: Int) : Unit = {
+    val startTimeMs = System.currentTimeMillis
+    acceptConnections(connectionQuotas, listenerDesc.listenerName, listenerDesc.defaultIp, numConnections, timeIntervalMs)
+    val elapsedSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis - startTimeMs)
+    val actualRate = (numConnections.toDouble / elapsedSeconds).toInt
+    if (actualRate - epsilon > expectedRate || actualRate + epsilon < expectedRate)
+      throw new TestFailedException(
+        (e: StackDepthException) =>
+          Some(s"Expected rate $expectedRate, but got $actualRate ($numConnections connections / $elapsedSeconds sec)"),
+        None,
+        Position.here)

Review comment:
       Yes, I had a memory that asserting from another thread would not do the right thing, but I think that was a very old memory. Checked and it works well. Replaced with `assertTrue`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r454010650



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
##########
@@ -572,16 +572,18 @@ public synchronized void removeReporter(MetricsReporter reporter) {
         }
     }
 
-    synchronized void registerMetric(KafkaMetric metric) {
+    synchronized void registerMetric(KafkaMetric metric, boolean report) {

Review comment:
       That's a good idea I haven't thought about. I tried that and it does look better. Will update PR with this approach.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r452502811



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))
+  }
+
+  private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
+    new MetricConfig()
+      .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
+  }
+
+  private def rateQuotaMetricTags(listenerOpt: Option[String]): util.Map[String, String] = {
+    val tags = new util.LinkedHashMap[String, String]
+    listenerOpt.foreach(listener => tags.put("listener", listener))
+    tags

Review comment:
       I realized that I don't need tags here anymore, since the name of the metric contains the name of the listener, so the metrics are already distinct per listener (and broker-wide). I removed that method altogether.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674639202


   Hi @rajinisivaram, the test failure turned out to be a bug where I did not remove connection rate sensors on listener removal. I fixed the code and tests now pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674884255


   @apovzner Thanks for the updates, LGTM. Merging to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r439786166



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -161,6 +162,9 @@ private KafkaMbean removeAttribute(KafkaMetric metric, String mBeanName) {
     private String addAttribute(KafkaMetric metric) {
         try {
             MetricName metricName = metric.metricName();
+            if (metricName.tags().containsKey(DO_NOT_REPORT_TAG)) {
+                return null;
+            }

Review comment:
       You are right, I replaced this with your first alternative.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r439143850



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()
+      val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+      if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
         val startNs = time.nanoseconds
+        val endThrottleTimeMs = startTimeMs + throttleTimeMs
+        var remainingThrottleTimeMs = throttleTimeMs
         do {
-          counts.wait()
-        } while (!connectionSlotAvailable(listenerName))
+          counts.wait(remainingThrottleTimeMs)

Review comment:
       This is exactly the behavior proposed in KIP -- if we reach any limit (number of connections or connection rate), we need to wait. So, if there is no space for a new connection, and the delay due to rate limit has passed, we would have to wait for a connection slot. However, remember that if we are waiting on an inter-broker connection slot, the broker finds and closes a connection of another listener to accommodate inter-broker connection. See KIP-402.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r454890281



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,89 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(quotaEntity), new Rate, null)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {

Review comment:
       nit: Shall we add a javadoc to this one as well?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,89 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {

Review comment:
       nit: Shall we add a javadoc to this one as well?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate > broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0)
+      }
+
+      // while the connection creation rate was throttled,
+      // expect all connections got created (not limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionListenerMustBeAboveZero(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+
+    val maxListenerConnectionRate = 0
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> maxListenerConnectionRate.toString).asJava
+    assertThrows[ConfigException] {
+      connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionRateReconfiguration(): Unit = {

Review comment:
       Fair enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r440142597



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate > broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0)
+      }

Review comment:
       nit: Would it make sense to add the pendant of `verifyNoBlockedPercentRecordedOnAllListeners` for this block? Something like `verifyNonZeroBlockedPercentRecordedOnAllListeners`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r440028429



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
##########
@@ -572,16 +572,18 @@ public synchronized void removeReporter(MetricsReporter reporter) {
         }
     }
 
-    synchronized void registerMetric(KafkaMetric metric) {
+    synchronized void registerMetric(KafkaMetric metric, boolean report) {

Review comment:
       Thinking a bit more about this, did you consider adding the flag to `MetricConfig`? It may be a bit simpler and cleaner as it avoids having to add the flag to all the methods. What do you think?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()
+      val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+      if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
         val startNs = time.nanoseconds
+        val endThrottleTimeMs = startTimeMs + throttleTimeMs
+        var remainingThrottleTimeMs = throttleTimeMs
         do {
-          counts.wait()
-        } while (!connectionSlotAvailable(listenerName))
+          counts.wait(remainingThrottleTimeMs)

Review comment:
       Thanks for the clarification. I am sorry but I misread the code the first time.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))
+  }
+
+  private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
+    new MetricConfig()
+      .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
+  }
+
+  private def rateQuotaMetricTags(listenerOpt: Option[String]): util.Map[String, String] = {
+    val tags = new util.LinkedHashMap[String, String]
+    listenerOpt.foreach(listener => tags.put("listener", listener))
+    tags

Review comment:
       What about returning `Collections.emptyMap` when `listenerOpt` is not defined and using `Collections.singletonMap` when it is?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1258,11 +1274,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()

Review comment:
       nit: `()` can be omitted here.

##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -163,13 +167,66 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
   }
 
+  @Test
+  def testDynamicListenerConnectionCreationRateQuota(): Unit = {
+    // Create another listener. PLAINTEXT is an inter-broker listener
+    // keep default limits
+    val newListenerNames = Seq("PLAINTEXT", "EXTERNAL")
+    val newListeners = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"
+    val props = new Properties
+    props.put(KafkaConfig.ListenersProp, newListeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT")
+    reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners))
+    waitForListener("EXTERNAL")
+
+    // new broker-wide connection rate limit
+    val connRateLimit = 18
+
+    // before setting connection rate to 10, verify we can do at least double that by default (no limit)
+    verifyConnectionRate(2 * connRateLimit, Int.MaxValue, "PLAINTEXT")
+
+    // Reduce total broker connection rate limit to 10 at the cluster level and verify the limit is enforced

Review comment:
       `to 18`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1258,11 +1274,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()

Review comment:
       It is a bit confusing to have `startTimeMs` and `startNs` defined few lines apart. Is it worth renaming `startTimeMs` to `startThrottleTimeMs` to clearly state that this is used as part of the throttle time computing. It would also be consistent with `endThrottleTimeMs`.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))

Review comment:
       nit: Are the parenthesis around `listenerOpt` really necessary?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1308,18 +1410,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")

Review comment:
       Not related to your PR but it seems that `listenerMax` is never defined. I think that `value` should be used instead here. Could you fix this? Could you also use `MaxConnectionsProp` instead of `max.connections` as you did for the other already?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",

Review comment:
       nit: I would rename this one now.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")

Review comment:
       nit: As the `quotaEntity` is also computed by the caller methods, would it make sense to pass it as an argument to `connectionRateMetricName`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+      throttleTimeMs
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))

Review comment:
       Ack. I did not know this.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -329,15 +613,46 @@ class ConnectionQuotasTest {
   }
 
   // this method must be called on a separate thread, because connectionQuotas.inc() may block
+  private def acceptConnectionsAndVerifyRate(connectionQuotas: ConnectionQuotas,
+                                             listenerDesc: ListenerDesc,
+                                             numConnections: Long,
+                                             timeIntervalMs: Long,
+                                             expectedRate: Int,
+                                             epsilon: Int) : Unit = {
+    val startTimeMs = System.currentTimeMillis
+    acceptConnections(connectionQuotas, listenerDesc.listenerName, listenerDesc.defaultIp, numConnections, timeIntervalMs)
+    val elapsedSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis - startTimeMs)
+    val actualRate = (numConnections.toDouble / elapsedSeconds).toInt
+    if (actualRate - epsilon > expectedRate || actualRate + epsilon < expectedRate)
+      throw new TestFailedException(
+        (e: StackDepthException) =>
+          Some(s"Expected rate $expectedRate, but got $actualRate ($numConnections connections / $elapsedSeconds sec)"),
+        None,
+        Position.here)

Review comment:
       Can't we use `assertTrue` here to simplify?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)

Review comment:
       nit: We could perhaps create a small help method like `brokerProps` that accepts a Map of customer configuration pairs and returns a `KafkaConfig`. That would reduce the boilerplate code.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate > broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0)
+      }
+
+      // while the connection creation rate was throttled,
+      // expect all connections got created (not limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionListenerMustBeAboveZero(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+
+    val maxListenerConnectionRate = 0
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> maxListenerConnectionRate.toString).asJava
+    assertThrows[ConfigException] {
+      connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionRateReconfiguration(): Unit = {

Review comment:
       I wonder if we could just check that the metric's config is correctly re-configured instead of testing the number of connections accepted. The goal of the test is not really to verify that the quota works but rather to ensure that metric is correctly re-configured. Have you considered this? The same would apply to `testMaxBrokerConnectionRateReconfiguration`.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)

Review comment:
       It seems that almost all the test cases instantiate an executor with `listeners.size`. Have you considered moving this to the `setUp` method and moving the `shutdownNow` to the `tearDown`?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec

Review comment:
       I really like the explanation next to the constants! I would recommend to group all the constants in the beginning of the test case. That would help to get a quick overview of the test case.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate > broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0)
+      }

Review comment:
       nit: Would it make sense to the pendant of `verifyNoBlockedPercentRecordedOnAllListeners` for this block? Something like `verifyNonZeroBlockedPercentRecordedOnAllListeners`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r453131010



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate > broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0)
+      }
+
+      // while the connection creation rate was throttled,
+      // expect all connections got created (not limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionListenerMustBeAboveZero(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+
+    val maxListenerConnectionRate = 0
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> maxListenerConnectionRate.toString).asJava
+    assertThrows[ConfigException] {
+      connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionRateReconfiguration(): Unit = {

Review comment:
       I still think it's important to test all the way that it does throttle to the correct value. We had a bug pretty recently, KAFKA-9658, where adding and then removing bandwidth quotas via a dynamic config changed the config and even changed some data structs, but not all data structs to actually change the throttling behavior. So, I feel more confident keeping the test up to checking the throttle. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r433696374



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)

Review comment:
       nit: This variable is not really needed.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")
+
+      val rate = maxConnectionCreationRate(configs)
+      if (rate <= 0)
+        throw new ConfigException(s"Invalid ${KafkaConfig.MaxConnectionCreationRateProp} $rate")
     }
 
     override def reconfigure(configs: util.Map[String, _]): Unit = {
       lock.synchronized {
         _maxConnections = maxConnections(configs)
+        updateConnectionRateQuota(maxConnectionCreationRate(configs), Some(listener.value()))

Review comment:
       nit: `()` can be removed after `value`.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()
+      val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+      if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
         val startNs = time.nanoseconds
+        val endThrottleTimeMs = startTimeMs + throttleTimeMs
+        var remainingThrottleTimeMs = throttleTimeMs
         do {
-          counts.wait()
-        } while (!connectionSlotAvailable(listenerName))
+          counts.wait(remainingThrottleTimeMs)

Review comment:
       A thread waiting here will be notified when a connection is closed (when `dec` is called). As connections in AK are long lived, couldn't we end up in a case where a connection is throttled for a longer period than the computed `trottleTimeMs` if no connection is closed in between?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -161,6 +162,9 @@ private KafkaMbean removeAttribute(KafkaMetric metric, String mBeanName) {
     private String addAttribute(KafkaMetric metric) {
         try {
             MetricName metricName = metric.metricName();
+            if (metricName.tags().containsKey(DO_NOT_REPORT_TAG)) {
+                return null;
+            }

Review comment:
       I am not convinced by this. The main issue being that other reporters will report the metric. If we really want to not report a metric, I think that we need a solution which works for all reporters. Could you perhaps elaborate more on the need here?
   
   I can think of the following alternatives:
   * add a flag to the sensor to indicate if it must be reported or not.
   * don't rely on metrics to create/store the sensor but have a local reference.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")
+
+      val rate = maxConnectionCreationRate(configs)
+      if (rate <= 0)
+        throw new ConfigException(s"Invalid ${KafkaConfig.MaxConnectionCreationRateProp} $rate")
     }
 
     override def reconfigure(configs: util.Map[String, _]): Unit = {
       lock.synchronized {
         _maxConnections = maxConnections(configs)
+        updateConnectionRateQuota(maxConnectionCreationRate(configs), Some(listener.value()))
         lock.notifyAll()
       }
     }
 
     private def maxConnections(configs: util.Map[String, _]): Int = {
       Option(configs.get(KafkaConfig.MaxConnectionsProp)).map(_.toString.toInt).getOrElse(Int.MaxValue)
     }
+    private def maxConnectionCreationRate(configs: util.Map[String, _]): Int = {

Review comment:
       nit: I would put an empty line before declaring the method.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+      throttleTimeMs
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))

Review comment:
       Wouldn't it make sense to expose the metrics? I think that they could be useful to know if connections are throttled at the broker or at the listener level, no? Moreover, the number of metrics is small so it should not hurt.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -689,6 +691,11 @@ object KafkaConfig {
     "should be configured based on broker capacity while listener limits should be configured based on application requirements. " +
     "New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are " +
     "permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case."
+  val MaxConnectionCreationRateDoc = "The maximum connection creation rate we allow in the broker at any time. Listener-level limits " +
+    s"may also be configured by prefixing the config name with the listener prefix, for example, <code>listener.name.internal.$MaxConnectionCreationRateProp</code>." +
+    "Broker-wide connection rate limit should be configured based on broker capacity while listener limits should be configured based on " +
+    "application requirements. New connections will be throttled if either the listener or the broker limit is reached, with the exception " +
+    " of inter-broker listener. Connections on the inter-broker listener will be throttled only when the listener-level rate limit is reached. "

Review comment:
       nit: extra space after the `.`.

##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -163,13 +167,65 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
   }
 
+  @Test
+  def testDynamicListenerConnectionCreationRateQuota(): Unit = {
+    // Create another listener. PLAINTEXT is an inter-broker listener
+    // keep default limits
+    val newListenerNames = Seq("PLAINTEXT", "EXTERNAL")
+    val newListeners = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"
+    val props = new Properties
+    props.put(KafkaConfig.ListenersProp, newListeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT")
+    reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners))
+    waitForListener("EXTERNAL")
+
+    // new broker-wide connection rate limit
+    val connRateLimit = 18
+
+    // before setting connection rate to 10, verify we can do at least double that by default (no limit)
+    verifyConnectionRate(2 * connRateLimit, Int.MaxValue, "PLAINTEXT")
+
+    // Reduce total broker connection rate limit to 10 at the cluster level and verify the limit is enforced
+    props.clear()  // so that we do not pass security protocol map which cannot be set at the cluster level
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, connRateLimit.toString)
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionCreationRateProp, connRateLimit.toString))
+    verifyConnectionRate(10, connRateLimit, "PLAINTEXT")
+
+    // Set 7 conn/sec rate limit for each listener and verify it gets enforced
+    val plaintextListenerProp = s"${listener.configPrefix}${KafkaConfig.MaxConnectionCreationRateProp}"
+    props.put(s"listener.name.external.${KafkaConfig.MaxConnectionCreationRateProp}", "7")

Review comment:
       nit: Could we define a val for `7` and reuse it bellow as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-674768235


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r457308585



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -70,37 +80,38 @@ class ConnectionQuotasTest {
         blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
           s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> name)))
     }
+    // use system time, because ConnectionQuota causes the current thread to wait with timeout, which waits based on
+    // system time; so using mock time will likely result in test flakiness due to a mixed use of mock and system time
+    metrics = new Metrics(new MetricConfig(), Collections.emptyList(), Time.SYSTEM)

Review comment:
       Is this required for all tests or only the new connection rate tests?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,89 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(quotaEntity), new Rate, null)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val quotaEntity = listenerOpt.getOrElse("broker")

Review comment:
       `broker` is a valid listener name, perhaps a reasonable name for inter-broker listener. 

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
##########
@@ -97,5 +99,13 @@ public MetricConfig recordLevel(Sensor.RecordingLevel recordingLevel) {
         return this;
     }
 
+    public boolean skipReporting() {

Review comment:
       We don't talk about this in the KIP and this is a public class. Why are we skipping reporting anyway?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,89 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(quotaEntity), new Rate, null)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val metric = metrics.metric(connectionRateMetricName(quotaEntity))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated $quotaEntity max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(quotaEntity: String): MetricName = {
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota",
+      s"Tracking $quotaEntity connection creation rate")
+  }
+
+  private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
+    new MetricConfig()
+      .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS)
+      .samples(config.numQuotaSamples)
+      .quota(new Quota(quotaLimit, true))
+      .skipReporting(true)
+  }
+
   class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable {
     @volatile private var _maxConnections = Int.MaxValue
+    private val _connectionRateSensor = createConnectionRateQuotaSensor(Int.MaxValue, Some(listener.value))
 
     def maxConnections: Int = _maxConnections
+    def connectionRateSensor: Sensor = _connectionRateSensor

Review comment:
       We could just make `connectionRateSensor` a `val`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r468175154



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -70,37 +80,38 @@ class ConnectionQuotasTest {
         blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
           s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> name)))
     }
+    // use system time, because ConnectionQuota causes the current thread to wait with timeout, which waits based on
+    // system time; so using mock time will likely result in test flakiness due to a mixed use of mock and system time
+    metrics = new Metrics(new MetricConfig(), Collections.emptyList(), Time.SYSTEM)

Review comment:
       This is required for all tests. For tests that are not supposed to trigger throttling due to connection rate quota, we want this because if the code incorrectly throttles to limit rate (calls wait() with timeout), the existing tests may start failing in a way that is hard to debug (timeout too early or too late, not in a place we expect). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r457944837



##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
##########
@@ -97,5 +99,13 @@ public MetricConfig recordLevel(Sensor.RecordingLevel recordingLevel) {
         return this;
     }
 
+    public boolean skipReporting() {

Review comment:
       If we are using a sensor to determine throttle time that is different from the one in Selector, we might want to expose  it as a metric anyway. In case of a bug, we want to know this metric, not one in Selector. Perhaps we could use `connections-accepted` instead of `connections-created` or something like that. In any case, `skipReporting` seems odd, so as @dajac  said, using a Sensor that is not added to the metrics registry may be an option too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org