You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Haoze Wu (Jira)" <ji...@apache.org> on 2021/11/16 01:15:00 UTC
[jira] [Created] (KAFKA-13457) socketChannel in Acceptor#accept is not closed upon IOException
Haoze Wu created KAFKA-13457:
--------------------------------
Summary: socketChannel in Acceptor#accept is not closed upon IOException
Key: KAFKA-13457
URL: https://issues.apache.org/jira/browse/KAFKA-13457
Project: Kafka
Issue Type: Bug
Components: network
Affects Versions: 2.8.0
Reporter: Haoze Wu
When the kafka.network.Acceptor in SocketServer.scala accepts a new connection in the `accept` function, it handles the `TooManyConnectionsException` and `ConnectionThrottledException`. However, line 717 or the socketChannel operations within the try block may potentially throw an IOException as well, which is not handled.
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
// Acceptor class
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept() // line 717
try {
connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
Some(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
close(endPoint.listenerName, socketChannel)
None
case e: ConnectionThrottledException =>
val ip = socketChannel.socket.getInetAddress
debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
None
}
}
{code}
This thrown IOException is caught in the caller `acceptNewConnections` in line 706, which only prints an error message. The socketChannel that throws this IOException is not closed.
{code:java}
//core/src/main/scala/kafka/network/SocketServer.scala
private def acceptNewConnections(): Unit = {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove() if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
...
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e) // line 706
}
}
}
}
{code}
We found during testing this would cause our Kafka clients to experience errors (InvalidReplicationFactorException) for 40+ seconds when creating new topics. After 40 seconds, the clients would be able to create new topics successfully.
We check that after adding the socketChannel.close() upon IOException, the symptoms will disappear, so the clients do not need to wait for 40s to be working again.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)