You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/12 14:05:57 UTC

[GitHub] [kafka] dajac commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

dajac commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r503249872



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1203,14 +1261,28 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1);

Review comment:
       Could we define this constant in a companion object?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1371,6 +1492,45 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the connection
+   *
+   * @param address
+   * @param timeMs
+   * @return
+   */
+  private def recordIpConnectionMaybeThrottle(address: InetAddress, timeMs: Long): Long = {
+    val connectionRateQuota = connectionRateForIp(address)
+    val quotaEnabled = connectionRateQuota != DynamicConfig.Ip.UnlimitedConnectionCreationRate
+    if (!quotaEnabled) {
+      return 0
+    }
+    val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateForIp(address), IpQuotaEntity(address))

Review comment:
       nit: Can't we reuse `connectionRateQuota` directly? 

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1394,28 +1554,38 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
    * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
    * listener or broker-wide, if listener is not provided.
    * @param quotaLimit connection creation rate quota
-   * @param listenerOpt listener name if sensor is for a listener
+   * @param connectionQuotaEntity entity to create the sensor for
    */
-  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
-    val sensorName = listenerOpt.map(listener => s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate")
-    val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit))
-    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null)
-    info(s"Created $sensorName sensor, quotaLimit=$quotaLimit")
-    sensor
+  private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
+    val (sensorName, sensorExpiration) = connectionQuotaEntity match {
+      case BrokerQuotaEntity => ("ConnectionAcceptRate", Long.MaxValue)
+      case listenerEntity: ListenerQuotaEntity => (s"ConnectionAcceptRate-${listenerEntity.entityName}", Long.MaxValue)
+      case ipEntity: IpQuotaEntity => (s"ConnectionAcceptRate-${ipEntity.entityName}", inactiveSensorExpirationTimeSeconds)
+    }

Review comment:
       Couldn't we rely on the `ConnectionQuotaEntity` trait to get these informations? That would avoid having to pattern match on the entity to get them.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket])

Review comment:
       - Do we really need to store delayed sockets per IP? It seems that we just close all the sockets in `throttledSockets` later on so it is not really necessary. Or have I missed something?
   - Similarly, we use a `DelayQueue` for the other quotas. Is there any reason why we don't use the same data structure here?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket])
+        val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
+        delayQueue += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
+        None
+    }
+  }
+
+  /**
+   * Close sockets for any connections that have been throttled
+   */
+  private def closeThrottledConnections(): Unit = {
+    val timeMs = time.milliseconds
+    val iter = throttledSockets.values.iterator
+    while (iter.hasNext) {

Review comment:
       If we were using a shared `DelayQueue` for all IPs, iterating over all the IPs wouldn't be necessary when there are no throttled connections ready to be closed yet.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = {
     // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if
     // the rate limit increases, because it is just one connection per listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == "connection-accept-rate" &&

Review comment:
       Could we use a constant for the name and use it everywhere? Same for the tag.

##########
File path: core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##########
@@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
       s"Admin client connection not closed (initial = $initialConnectionCount, current = $connectionCount)")
   }
 
+  private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): Unit = {
+    adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),

Review comment:
       nit: Could we use directly the `Admin` client here? We don't use the ZK admin client in the suite at all so it would be better if we don't use it. I am not sure if this depends on the pending implementation that you mentioned.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1394,28 +1554,38 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
    * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given
    * listener or broker-wide, if listener is not provided.
    * @param quotaLimit connection creation rate quota
-   * @param listenerOpt listener name if sensor is for a listener
+   * @param connectionQuotaEntity entity to create the sensor for
    */
-  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = {
-    val sensorName = listenerOpt.map(listener => s"ConnectionAcceptRate-$listener").getOrElse("ConnectionAcceptRate")
-    val sensor = metrics.sensor(sensorName, rateQuotaMetricConfig(quotaLimit))
-    sensor.add(connectionRateMetricName(listenerOpt), new Rate, null)
-    info(s"Created $sensorName sensor, quotaLimit=$quotaLimit")
-    sensor
+  private def getOrCreateConnectionRateQuotaSensor(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Sensor = {
+    val (sensorName, sensorExpiration) = connectionQuotaEntity match {
+      case BrokerQuotaEntity => ("ConnectionAcceptRate", Long.MaxValue)
+      case listenerEntity: ListenerQuotaEntity => (s"ConnectionAcceptRate-${listenerEntity.entityName}", Long.MaxValue)
+      case ipEntity: IpQuotaEntity => (s"ConnectionAcceptRate-${ipEntity.entityName}", inactiveSensorExpirationTimeSeconds)
+    }
+    sensorAccessor.getOrCreate(
+      sensorName,
+      sensorExpiration,
+      sensor => sensor.add(connectionRateMetricName(connectionQuotaEntity), new Rate, rateQuotaMetricConfig(quotaLimit))
+    )
   }
 
   /**
-   * Updates quota configuration for a given listener or broker-wide (if 'listenerOpt' is None)
+   * Updates quota configuration for a given connection quota entity
    */
-  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = {
-    val metric = metrics.metric(connectionRateMetricName(listenerOpt))
-    metric.config(rateQuotaMetricConfig(quotaLimit))
-    info(s"Updated ${listenerOpt.getOrElse("broker-wide")} max connection creation rate to $quotaLimit")
+  private def updateConnectionRateQuota(quotaLimit: Int, connectionQuotaEntity: ConnectionQuotaEntity): Unit = {
+    val metricOpt = Option(metrics.metric(connectionRateMetricName(connectionQuotaEntity)))
+    metricOpt.foreach { metric =>
+      metric.config(rateQuotaMetricConfig(quotaLimit))
+      info(s"Updated ${connectionQuotaEntity.entityName} max connection creation rate to $quotaLimit")
+    }
   }
 
-  private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = {
-    val tags = listenerOpt.map(listener => Map("listener" -> listener)).getOrElse(Map())
-    val namePrefix = listenerOpt.map(_ => "").getOrElse("broker-")
+  private def connectionRateMetricName(connectionQuotaEntity: ConnectionQuotaEntity): MetricName = {
+    val (namePrefix, tags) = connectionQuotaEntity match {
+      case BrokerQuotaEntity => ("broker-", Map.empty[String, String])
+      case listener: ListenerQuotaEntity => ("", Map("listener" -> listener.entityName))
+      case ip: IpQuotaEntity => ("", Map("ip" -> ip.entityName))
+    }

Review comment:
       Same comment as before. Couldn't we rely on the `ConnectionQuotaEntity` trait here as well?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1371,6 +1492,45 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
     }
   }
 
+  /**
+   * To avoid over-recording listener/broker connection rate, we unrecord a listener or broker connection
+   * if the IP gets throttled later.
+   *
+   * @param listenerName listener to unrecord connection
+   * @param timeMs current time in milliseconds
+   */
+  private def unrecordListenerConnection(listenerName: ListenerName, timeMs: Long): Unit = {
+    if (!protectedListener(listenerName)) {
+      brokerConnectionRateSensor.record(-1.0, timeMs, false)
+    }
+    maxConnectionsPerListener
+      .get(listenerName)
+      .foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
+  }
+
+  /**
+   * Calculates the delay needed to bring the observed connection creation rate to the IP limit.
+   * If the connection would cause an IP quota violation, un-record the connection
+   *
+   * @param address
+   * @param timeMs
+   * @return
+   */
+  private def recordIpConnectionMaybeThrottle(address: InetAddress, timeMs: Long): Long = {
+    val connectionRateQuota = connectionRateForIp(address)
+    val quotaEnabled = connectionRateQuota != DynamicConfig.Ip.UnlimitedConnectionCreationRate
+    if (!quotaEnabled) {
+      return 0
+    }
+    val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateForIp(address), IpQuotaEntity(address))
+    val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs)
+    if (throttleMs > 0) {
+      // unrecord the connection since we won't accept the connection
+      sensor.record(-1.0, timeMs, false)
+    }

Review comment:
       Instead of recording, checking the quota, and unrecording if the quota is violated, we may be able to check the quota and record only if the quota is not violated yet. Your implementation suggests that we want to enforce a strict quota here. We did so for the controller mutations with the following implementation:
   
   ```
   try {
         quotaSensor synchronized {
           quotaSensor.checkQuotas(timeMs)
           quotaSensor.record(permits, timeMs, false)
         }
       } catch {
         case e: QuotaViolationException =>
            ...
       }
   ```
   
   It seems that this should work here as well. Let me know what you think about this.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -600,43 +607,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
     startupComplete()
     try {
-      var currentProcessorIndex = 0
       while (isRunning) {
         try {
-          val ready = nioSelector.select(500)
-          if (ready > 0) {
-            val keys = nioSelector.selectedKeys()
-            val iter = keys.iterator()
-            while (iter.hasNext && isRunning) {
-              try {
-                val key = iter.next
-                iter.remove()
-
-                if (key.isAcceptable) {
-                  accept(key).foreach { socketChannel =>
-                    // Assign the channel to the next processor (using round-robin) to which the
-                    // channel can be added without blocking. If newConnections queue is full on
-                    // all processors, block until the last one is able to accept a connection.
-                    var retriesLeft = synchronized(processors.length)
-                    var processor: Processor = null
-                    do {
-                      retriesLeft -= 1
-                      processor = synchronized {
-                        // adjust the index (if necessary) and retrieve the processor atomically for
-                        // correct behaviour in case the number of processors is reduced dynamically
-                        currentProcessorIndex = currentProcessorIndex % processors.length
-                        processors(currentProcessorIndex)
-                      }
-                      currentProcessorIndex += 1
-                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
-                  }
-                } else
-                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
-              } catch {
-                case e: Throwable => error("Error while accepting connection", e)
-              }
-            }
-          }
+          acceptNewConnections()
+          closeThrottledConnections()

Review comment:
       Out of curiosity, have we considered using a "reaper" running in a separate thread to drain the queue and close throttled connections?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = {
     // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if
     // the rate limit increases, because it is just one connection per listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == "connection-accept-rate" &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey("ip")
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+
+    ip match {
+      case Some(addr) =>
+        val address = InetAddress.getByName(addr)
+        if (maxConnectionRate.isDefined) {

Review comment:
       nit: We could use pattern matching here to treat both cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org