You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "David Jacot (Jira)" <ji...@apache.org> on 2021/11/24 08:32:00 UTC

[jira] [Resolved] (KAFKA-13457) SocketChannel in Acceptor#accept is not closed upon IOException

     [ https://issues.apache.org/jira/browse/KAFKA-13457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

David Jacot resolved KAFKA-13457.
---------------------------------
    Fix Version/s: 3.2.0
         Reviewer: David Jacot
       Resolution: Fixed

> SocketChannel in Acceptor#accept is not closed upon IOException
> ---------------------------------------------------------------
>
>                 Key: KAFKA-13457
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13457
>             Project: Kafka
>          Issue Type: Bug
>          Components: network
>    Affects Versions: 2.8.0
>            Reporter: Haoze Wu
>            Priority: Major
>             Fix For: 3.2.0
>
>
> When the kafka.network.Acceptor in SocketServer.scala accepts a new connection in the `accept` function, it handles the `TooManyConnectionsException` and `ConnectionThrottledException`. However, the socketChannel operations (line 720 or 721 or 722) within the try block may potentially throw an IOException as well, which is not handled.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
> // Acceptor class
>   private def accept(key: SelectionKey): Option[SocketChannel] = {
>     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
>     val socketChannel = serverSocketChannel.accept()     // line 717
>     try {
>       connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
>       socketChannel.configureBlocking(false)             // line 720
>       socketChannel.socket().setTcpNoDelay(true)         // line 721
>       socketChannel.socket().setKeepAlive(true)          // line 722
>       if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
>         socketChannel.socket().setSendBufferSize(sendBufferSize)
>       Some(socketChannel)
>     } catch {
>       case e: TooManyConnectionsException =>       
>         info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
>         close(endPoint.listenerName, socketChannel)
>         None
>       case e: ConnectionThrottledException => 
>         val ip = socketChannel.socket.getInetAddress
>         debug(s"Delaying closing of connection from $ip for ${e.throttleTimeMs} ms")
>         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
>         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
>         None
>     }
>   }
> {code}
> This thrown IOException is caught in the caller `acceptNewConnections` in line 706, which only prints an error message. The socketChannel that throws this IOException is not closed.
>  
> {code:java}
> //core/src/main/scala/kafka/network/SocketServer.scala
>   private def acceptNewConnections(): Unit = {
>     val ready = nioSelector.select(500)
>     if (ready > 0) {
>       val keys = nioSelector.selectedKeys()
>       val iter = keys.iterator()
>       while (iter.hasNext && isRunning) {
>         try {
>           val key = iter.next
>           iter.remove()          if (key.isAcceptable) {
>             accept(key).foreach { socketChannel => 
>                 ...
>               } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
>             }
>           } else
>             throw new IllegalStateException("Unrecognized key state for acceptor thread.")
>         } catch {
>           case e: Throwable => error("Error while accepting connection", e)   // line 706
>         }
>       }
>     }
>   }
> {code}
> We found during testing this would cause our Kafka clients to experience errors (InvalidReplicationFactorException) for 40+ seconds when creating new topics. After 40 seconds, the clients would be able to create new topics successfully.
> We check that after adding the socketChannel.close() upon IOException, the symptoms will disappear, so the clients do not need to wait for 40s to be working again.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)