You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/15 08:58:30 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r454890281



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,89 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(quotaEntity), new Rate, null)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {

Review comment:
       nit: Shall we add a javadoc to this one as well?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1289,15 +1311,89 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {

Review comment:
       nit: Shall we add a javadoc to this one as well?

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate > broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to be recorded", meter.count() > 0)
+      }
+
+      // while the connection creation rate was throttled,
+      // expect all connections got created (not limit on the number of connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionListenerMustBeAboveZero(): Unit = {
+    val config = KafkaConfig.fromProps(brokerPropsWithDefaultConnectionLimits)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    connectionQuotas.addListener(config, listeners("EXTERNAL").listenerName)
+
+    val maxListenerConnectionRate = 0
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> maxListenerConnectionRate.toString).asJava
+    assertThrows[ConfigException] {
+      connectionQuotas.maxConnectionsPerListener(listeners("EXTERNAL").listenerName).validateReconfiguration(listenerConfig)
+    }
+  }
+
+  @Test
+  def testMaxListenerConnectionRateReconfiguration(): Unit = {

Review comment:
       Fair enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org