You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "David Jacot (Jira)" <ji...@apache.org> on 2021/11/24 08:29:00 UTC
[jira] [Updated] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException
[ https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot updated KAFKA-13457:
--------------------------------
Summary: SocketChannel in Acceptor#accept is not closed upon IOException (was: socketChannel in Acceptor#accept is not closed upon IOException)
> SocketChannel in Acceptor#accept is not closed upon IOException
> ---------------------------------------------------------------
>
> Key: KAFKA-13457
> URL: https://issues.apache.org/jira/browse/KAFKA-13457
> Project: Kafka
> Issue Type: Bug
> Components: network
> Affects Versions: 2.8.0
> Reporter: Haoze Wu
> Priority: Major
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new connection in the `accept` function, it handles the `TooManyConnectionsException` and `ConnectionThrottledException`. However, the socketChannel operations (line 720 or 721 or 722) within the try block may potentially throw an IOException as well, which is not handled.
>
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
> private def accept(key: SelectionKey): Option[SocketChannel] = {
> val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
> val socketChannel = serverSocketChannel.accept() // line 717
> try {
> connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
> socketChannel.configureBlocking(false) // line 720
> socketChannel.socket().setTcpNoDelay(true) // line 721
> socketChannel.socket().setKeepAlive(true) // line 722
> if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
> socketChannel.socket().setSendBufferSize(sendBufferSize)
> Some(socketChannel)
> } catch {
> case e: TooManyConnectionsException =>
> info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
> close(endPoint.listenerName, socketChannel)
> None
> case e: ConnectionThrottledException =>
> val ip = socketChannel.socket.getInetAddress
> debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
> val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
> throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
> None
> }
> }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in line 706, which only prints an error message. The socketChannel that throws this IOException is not closed.
>
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> private def acceptNewConnections(): Unit = {
> val ready = nioSelector.select(500)
> if (ready > 0) {
> val keys = nioSelector.selectedKeys()
> val iter = keys.iterator()
> while (iter.hasNext && isRunning) {
> try {
> val key = iter.next
> iter.remove() if (key.isAcceptable) {
> accept(key).foreach { socketChannel =>
> ...
> } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
> }
> } else
> throw new IllegalStateException("Unrecognized key state for acceptor thread.")
> } catch {
> case e: Throwable => error("Error while accepting connection", e) // line 706
> }
> }
> }
> }
> {code}
> We found during testing this would cause our Kafka clients to experience errors (InvalidReplicationFactorException) for 40+ seconds when creating new topics. After 40 seconds, the clients would be able to create new topics successfully.
> We check that after adding the socketChannel.close() upon IOException, the symptoms will disappear, so the clients do not need to wait for 40s to be working again.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)