You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 08:44:47 UTC

[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r433696374



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)

Review comment:
       nit: This variable is not really needed.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")
+
+      val rate = maxConnectionCreationRate(configs)
+      if (rate <= 0)
+        throw new ConfigException(s"Invalid ${KafkaConfig.MaxConnectionCreationRateProp} $rate")
     }
 
     override def reconfigure(configs: util.Map[String, _]): Unit = {
       lock.synchronized {
         _maxConnections = maxConnections(configs)
+        updateConnectionRateQuota(maxConnectionCreationRate(configs), Some(listener.value()))

Review comment:
       nit: `()` can be removed after `value`.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
                                     acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      if (!connectionSlotAvailable(listenerName)) {
+      val startTimeMs = time.milliseconds()
+      val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+      if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
         val startNs = time.nanoseconds
+        val endThrottleTimeMs = startTimeMs + throttleTimeMs
+        var remainingThrottleTimeMs = throttleTimeMs
         do {
-          counts.wait()
-        } while (!connectionSlotAvailable(listenerName))
+          counts.wait(remainingThrottleTimeMs)

Review comment:
       A thread waiting here will be notified when a connection is closed (when `dec` is called). As connections in AK are long lived, couldn't we end up in a case where a connection is throttled for a longer period than the computed `trottleTimeMs` if no connection is closed in between?

##########
File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##########
@@ -161,6 +162,9 @@ private KafkaMbean removeAttribute(KafkaMetric metric, String mBeanName) {
     private String addAttribute(KafkaMetric metric) {
         try {
             MetricName metricName = metric.metricName();
+            if (metricName.tags().containsKey(DO_NOT_REPORT_TAG)) {
+                return null;
+            }

Review comment:
       I am not convinced by this. The main issue being that other reporters will report the metric. If we really want to not report a metric, I think that we need a solution which works for all reporters. Could you perhaps elaborate more on the need here?
   
   I can think of the following alternatives:
   * add a flag to the sensor to indicate if it must be reported or not.
   * don't rely on metrics to create/store the sensor but have a local reference.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
       val value = maxConnections(configs)
       if (value <= 0)
         throw new ConfigException("Invalid max.connections $listenerMax")
+
+      val rate = maxConnectionCreationRate(configs)
+      if (rate <= 0)
+        throw new ConfigException(s"Invalid ${KafkaConfig.MaxConnectionCreationRateProp} $rate")
     }
 
     override def reconfigure(configs: util.Map[String, _]): Unit = {
       lock.synchronized {
         _maxConnections = maxConnections(configs)
+        updateConnectionRateQuota(maxConnectionCreationRate(configs), Some(listener.value()))
         lock.notifyAll()
       }
     }
 
     private def maxConnections(configs: util.Map[String, _]): Int = {
       Option(configs.get(KafkaConfig.MaxConnectionsProp)).map(_.toString.toInt).getOrElse(Int.MaxValue)
     }
+    private def maxConnectionCreationRate(configs: util.Map[String, _]): Int = {

Review comment:
       nit: I would put an empty line before declaring the method.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
     maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = {
+    val listenerThrottleTimeMs = maxConnectionsPerListener
+      .get(listenerName)
+      .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+      .getOrElse(0)
+
+    if (protectedListener(listenerName)) {
+      listenerThrottleTimeMs
+    } else {
+      val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+      val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+      throttleTimeMs
+    }
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+    try {
+      sensor.record(1.0, timeMs)
+      0
+    } catch {
+      case e: QuotaViolationException =>
+        val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+          e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt
+        debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms")
+        throttleTimeMs
+    }
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit))
+    sensor.add(connectionRateMetricName(listenerOpt), new Rate)
+    info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit")
+    sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
+    val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+    metric.config(rateQuotaMetricConfig(quotaLimit))
+    info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
+    val quotaEntity = listenerOpt.getOrElse("broker")
+    metrics.metricName(
+      s"connection-creation-rate-$quotaEntity",
+      "connection-quota-no-jmx",
+      s"Tracking $quotaEntity connection creation rate",
+      rateQuotaMetricTags(listenerOpt))

Review comment:
       Wouldn't it make sense to expose the metrics? I think that they could be useful to know if connections are throttled at the broker or at the listener level, no? Moreover, the number of metrics is small so it should not hurt.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -689,6 +691,11 @@ object KafkaConfig {
     "should be configured based on broker capacity while listener limits should be configured based on application requirements. " +
     "New connections are blocked if either the listener or broker limit is reached. Connections on the inter-broker listener are " +
     "permitted even if broker-wide limit is reached. The least recently used connection on another listener will be closed in this case."
+  val MaxConnectionCreationRateDoc = "The maximum connection creation rate we allow in the broker at any time. Listener-level limits " +
+    s"may also be configured by prefixing the config name with the listener prefix, for example, <code>listener.name.internal.$MaxConnectionCreationRateProp</code>." +
+    "Broker-wide connection rate limit should be configured based on broker capacity while listener limits should be configured based on " +
+    "application requirements. New connections will be throttled if either the listener or the broker limit is reached, with the exception " +
+    " of inter-broker listener. Connections on the inter-broker listener will be throttled only when the listener-level rate limit is reached. "

Review comment:
       nit: extra space after the `.`.

##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -163,13 +167,65 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connections not closed")
   }
 
+  @Test
+  def testDynamicListenerConnectionCreationRateQuota(): Unit = {
+    // Create another listener. PLAINTEXT is an inter-broker listener
+    // keep default limits
+    val newListenerNames = Seq("PLAINTEXT", "EXTERNAL")
+    val newListeners = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"
+    val props = new Properties
+    props.put(KafkaConfig.ListenersProp, newListeners)
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT")
+    reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.ListenersProp, newListeners))
+    waitForListener("EXTERNAL")
+
+    // new broker-wide connection rate limit
+    val connRateLimit = 18
+
+    // before setting connection rate to 10, verify we can do at least double that by default (no limit)
+    verifyConnectionRate(2 * connRateLimit, Int.MaxValue, "PLAINTEXT")
+
+    // Reduce total broker connection rate limit to 10 at the cluster level and verify the limit is enforced
+    props.clear()  // so that we do not pass security protocol map which cannot be set at the cluster level
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, connRateLimit.toString)
+    reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionCreationRateProp, connRateLimit.toString))
+    verifyConnectionRate(10, connRateLimit, "PLAINTEXT")
+
+    // Set 7 conn/sec rate limit for each listener and verify it gets enforced
+    val plaintextListenerProp = s"${listener.configPrefix}${KafkaConfig.MaxConnectionCreationRateProp}"
+    props.put(s"listener.name.external.${KafkaConfig.MaxConnectionCreationRateProp}", "7")

Review comment:
       nit: Could we define a val for `7` and reuse it bellow as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org