You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "Haoze Wu (Jira)" <ji...@apache.org> on 2021/11/25 05:20:00 UTC

[jira] [Created] (HADOOP-18024) SocketChannel is not closed when IOException happens in Server$Listener.doAccept

Haoze Wu created HADOOP-18024:
---------------------------------

             Summary: SocketChannel is not closed when IOException happens in Server$Listener.doAccept
                 Key: HADOOP-18024
                 URL: https://issues.apache.org/jira/browse/HADOOP-18024
             Project: Hadoop Common
          Issue Type: Bug
          Components: ipc
    Affects Versions: 3.2.2
            Reporter: Haoze Wu


This is a follow-up of https://issues.apache.org/jira/browse/HADOOP-17552 .

When the symptom described in https://issues.apache.org/jira/browse/HADOOP-17552 happens, the client may time out in 2min, according to the default RPC timeout configuration specified in https://issues.apache.org/jira/browse/HADOOP-17552. Before this timeout, the client just waits, and does not know this issue happens.

However, we recently found that actually the client doesn’t need to waste this 2min, and the server’s availability can be also improved. If the IOException happens in line 1402 or 1403 or 1404, we can just close this problematic `SocketChannel` and continue to accept new socket connections. The client side can also be aware of the close socket immediately, instead of waiting 2min.

The old implementation:

 
{code:java}
//hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

   public void run() {
      while (running) {
        // ...
        try {
          // ...
          while (iter.hasNext()) {
            // ...
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);                              // line 1348
              }
            } catch (IOException e) {                         // line 1350
            }
            // ...
          }
        } catch (OutOfMemoryError e) {
          // ...
        } catch (Exception e) {
          // ...
        }
      }
    } {code}
 

 
{code:java}
//hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

    void doAccept(SelectionKey key) throws InterruptedException, IOException, 
        OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {           // line 1400

        channel.configureBlocking(false);                     // line 1402
        channel.socket().setTcpNoDelay(tcpNoDelay);           // line 1403
        channel.socket().setKeepAlive(true);                  // line 1404

        Reader reader = getReader();
        Connection c = connectionManager.register(channel,
            this.listenPort, this.isOnAuxiliaryPort);
        // If the connectionManager can't take it, close the connection.
        if (c == null) {
          if (channel.isOpen()) {
            IOUtils.cleanup(null, channel);
          }
          connectionManager.droppedConnections.getAndIncrement();
          continue;
        }
        key.attach(c);  // so closeCurrentConnection can get the object
        reader.addConnection(c);
      }
    } {code}
We propose that the following implementation is better:
{code:java}
    void doAccept(SelectionKey key) throws InterruptedException, IOException, 
        OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {           // line 1400

        try {
          channel.configureBlocking(false);                   // line 1402
          channel.socket().setTcpNoDelay(tcpNoDelay);         // line 1403
          channel.socket().setKeepAlive(true);                // line 1404
        } catch (IOException e) {
          LOG.warn(...);
          try {
            channel.socket().close();
            channel.close();
          } catch (IOException ignored) { }
          continue;
        }

        // ...
      }
    }{code}
The advantages include:
 # {*}In the old implementation{*}, the `ServerSocketChannel` was abandoned due to the single exception in this single `SocketChannel`, because the exception handler is in line 1350. {*}In the new implementation{*}, we use a try-catch to handle the exception in line 1402 or 1403 or 1404, then the `ServerSocketChannel` can continue to accept new connections, and don’t need to go back to the line 1348 in the next while loop in the run method.
 # {*}In the old implementation{*}, the client (another endpoint of this `SocketChannel`) is not aware of this issue, because the `SocketChannel` is accepted and not closed. {*}In the new implementation{*}, we close the `SocketChannel` when the IOException happens, then the client will immediately get EOF from the socket. Then the client can choose to retry or throw an exception, by the client’s discretion.

 

This code pattern was adopted by other communities. For example, in Kafka [https://github.com/apache/kafka/blob/23e9818e625976c22fe6d4297a5ab76b01f92ef6/core/src/main/scala/kafka/network/SocketServer.scala#L714-L740]:
{code:java}
   /**
   * Accept a new connection
   */
  private def accept(key: SelectionKey): Option[SocketChannel] = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
      configureAcceptedSocketChannel(socketChannel)
      Some(socketChannel)
    } catch {
      case e: TooManyConnectionsException =>
        info(...)
        close(endPoint.listenerName, socketChannel)
        None
      case e: ConnectionThrottledException =>
        // ...
        None
      case e: IOException =>
        error(...)
        close(endPoint.listenerName, socketChannel)
        None
    }
  }

  /**
   * Close `channel` and decrement the connection count.
   */
  def close(listenerName: ListenerName, channel: SocketChannel): Unit = {
    if (channel != null) {
      // ...
      closeSocket(channel)
    }
  }

  protected def closeSocket(channel: SocketChannel): Unit = {
    CoreUtils.swallow(channel.socket().close(), this, Level.ERROR)
    CoreUtils.swallow(channel.close(), this, Level.ERROR)
  }
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org