You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/08 23:56:12 UTC

[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity)
   private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+      val startThrottleTimeMs = time.milliseconds
+
+      val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs)

Review comment:
       It would be more efficient if we throttled IPs **after** we know that we can accept a connection based on broker-wide and per-listener limits, since reaching broker/listener limits block the acceptor thread while throttling IPs needs more processing. Otherwise, if you reach both broker and per IP limit, the broker will continue accepting and delaying connections where it is justified to block an acceptor thread based on reaching a broker rate limit. Basically, call `waitForConnectionSlot` first. Similar how we check per IP limit on number of connections after we know that we can accept a new connection based on broker/listener limits. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org