You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/27 17:29:24 UTC

[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512887499



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket])

Review comment:
       I agree, that it is better to optimize for checking connections vs. overhead of adding/removing to the queue, because `closeThrottledConnections` runs pretty often (on a loop), which I think also means that finding that there are no connections yet to unthrottle would also be common. Or very few connections to unthrottle. So, after reading all your evaluations above, I am also leaning towards using a delay queue here.
   
   Not sure if the question #1 about why we need to delay closing a connection got answered, so answering just in case. Since we want to throttle accepting connections from an IP, closing a connection due to reaching IP quota right away would not help with throttling accepting connections since that IP is going to reconnect. 
   
   if we close the connection right away, that IP is going to reconnect right away, adn 

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -697,6 +714,31 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
         close(endPoint.listenerName, socketChannel)
         None
+      case e: ConnectionThrottledException =>
+        val ip = socketChannel.socket.getInetAddress
+        debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
+        val delayQueue = throttledSockets.computeIfAbsent(ip, _ => new mutable.Queue[DelayedCloseSocket])

Review comment:
       I agree, that it is better to optimize for checking connections vs. overhead of adding/removing to the queue, because `closeThrottledConnections` runs pretty often (on a loop), which I think also means that finding that there are no connections yet to unthrottle would also be common. Or very few connections to unthrottle. So, after reading all your evaluations above, I am also leaning towards using a delay queue here.
   
   Not sure if the question #1 about why we need to delay closing a connection got answered, so answering just in case. Since we want to throttle accepting connections from an IP, closing a connection due to reaching IP quota right away would not help with throttling accepting connections since that IP is going to reconnect. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org