You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/02/09 15:13:16 UTC

[kafka] branch 0.11.0 updated: KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 73c646c  KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517)
73c646c is described below

commit 73c646c442fd17e3f9919eb2fd50fdac75e32917
Author: parafiend <da...@hotmail.com>
AuthorDate: Fri Feb 9 04:59:18 2018 -0800

    KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives (#4517)
    
    If an exception is encountered while sending data to a client connection, that connection is disconnected. If there are staged receives for that connection, they are tracked to process those records. However, if the exception was encountered during processing a `RequestChannel.Request`, the `KafkaChannel` for that connection is muted and won't be processed.
    
    Disable processing of outstanding staged receives if a send fails. This stops the leak of the memory for pending requests and the file descriptor of the TCP socket.
    
    Test that a channel is closed when an exception is raised while writing to a socket that has been closed by the client. Since sending a response requires acks != 0, allow specifying the required acks for test requests in SocketServerTest.scala.
    
    Author: Graham Campbell <gr...@salesforce.com>
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>, Ismael Juma <is...@juma.me.uk>, Ted Yu <yu...@gmail.com>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../org/apache/kafka/common/network/Selector.java  | 44 ++++++++++----
 .../main/scala/kafka/network/SocketServer.scala    |  8 +++
 .../unit/kafka/network/SocketServerTest.scala      | 68 ++++++++++++++++++----
 4 files changed, 99 insertions(+), 23 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9825671..c35c77c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
               files="AbstractRequest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|PluginUtils).java"/>
+              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Selector|Sender|Serdes|PluginUtils).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 00392d0..06d0f96 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -269,9 +269,17 @@ public class Selector implements Selectable, AutoCloseable {
             KafkaChannel channel = channelOrFail(connectionId, false);
             try {
                 channel.setSend(send);
-            } catch (CancelledKeyException e) {
+            } catch (Exception e) {
+                // update the state for consistency, the channel will be discarded after `close`
+                channel.state(ChannelState.FAILED_SEND);
+                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                 this.failedSends.add(connectionId);
-                close(channel, false);
+                close(channel, false, false);
+                if (!(e instanceof CancelledKeyException)) {
+                    log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
+                            connectionId, e);
+                    throw e;
+                }
             }
         }
     }
@@ -354,6 +362,7 @@ public class Selector implements Selectable, AutoCloseable {
             if (idleExpiryManager != null)
                 idleExpiryManager.update(channel.id(), currentTimeNanos);
 
+            boolean sendFailed = false;
             try {
 
                 /* complete any connections that have finished their handshake (either normally or immediately) */
@@ -384,7 +393,13 @@ public class Selector implements Selectable, AutoCloseable {
 
                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
                 if (channel.ready() && key.isWritable()) {
-                    Send send = channel.write();
+                    Send send = null;
+                    try {
+                        send = channel.write();
+                    } catch (Exception e) {
+                        sendFailed = true;
+                        throw e;
+                    }
                     if (send != null) {
                         this.completedSends.add(send);
                         this.sensors.recordBytesSent(channel.id(), send.size());
@@ -393,7 +408,7 @@ public class Selector implements Selectable, AutoCloseable {
 
                 /* cancel any defunct sockets */
                 if (!key.isValid())
-                    close(channel, true);
+                    close(channel, true, true);
 
             } catch (Exception e) {
                 String desc = channel.socketDescription();
@@ -401,7 +416,7 @@ public class Selector implements Selectable, AutoCloseable {
                     log.debug("Connection with {} disconnected", desc, e);
                 else
                     log.warn("Unexpected error from {}; closing connection", desc, e);
-                close(channel, true);
+                close(channel, !sendFailed, true);
             } finally {
                 maybeRecordTimePerConnection(channel, channelStartTimeNanos);
             }
@@ -479,7 +494,7 @@ public class Selector implements Selectable, AutoCloseable {
                     log.trace("About to close the idle connection from {} due to being idle for {} millis",
                             connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000);
                 channel.state(ChannelState.EXPIRED);
-                close(channel, true);
+                close(channel, true, true);
             }
         }
     }
@@ -538,7 +553,12 @@ public class Selector implements Selectable, AutoCloseable {
             // There is no disconnect notification for local close, but updating
             // channel state here anyway to avoid confusion.
             channel.state(ChannelState.LOCAL_CLOSE);
-            close(channel, false);
+            close(channel, false, false);
+        } else {
+            KafkaChannel closingChannel = this.closingChannels.remove(id);
+            // Close any closing channel, leave the channel in the state in which closing was triggered
+            if (closingChannel != null)
+                doClose(closingChannel, false);
         }
     }
 
@@ -553,7 +573,10 @@ public class Selector implements Selectable, AutoCloseable {
      * closed immediately. The channel will not be added to disconnected list and it is the
      * responsibility of the caller to handle disconnect notifications.
      */
-    private void close(KafkaChannel channel, boolean processOutstanding) {
+    private void close(KafkaChannel channel, boolean processOutstanding, boolean notifyDisconnect) {
+
+        if (processOutstanding && !notifyDisconnect)
+            throw new IllegalStateException("Disconnect notification required for remote disconnect after processing outstanding requests");
 
         channel.disconnect();
 
@@ -571,8 +594,9 @@ public class Selector implements Selectable, AutoCloseable {
         if (processOutstanding && deque != null && !deque.isEmpty()) {
             // stagedReceives will be moved to completedReceives later along with receives from other channels
             closingChannels.put(channel.id(), channel);
+            log.debug("Tracking closing connection {} to process outstanding requests", channel.id());
         } else
-            doClose(channel, processOutstanding);
+            doClose(channel, notifyDisconnect);
         this.channels.remove(channel.id());
 
         if (idleExpiryManager != null)
@@ -700,7 +724,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     // only for testing
-    int numStagedReceives(KafkaChannel channel) {
+    public int numStagedReceives(KafkaChannel channel) {
         Deque<NetworkReceive> deque = stagedReceives.get(channel);
         return deque == null ? 0 : deque.size();
     }
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 6db70cf..af94231 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -605,6 +605,14 @@ private[kafka] class Processor(val id: Int,
   private[network] def channel(connectionId: String): Option[KafkaChannel] =
     Option(selector.channel(connectionId))
 
+  /* For test usage */
+  private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] =
+    channel(connectionId).orElse(Option(selector.closingChannel(connectionId)))
+
+  // Visible for testing
+  private[network] def numStagedReceives(connectionId: String): Int =
+    openOrClosingChannel(connectionId).map(c => selector.numStagedReceives(c)).getOrElse(0)
+
   /**
    * Wakeup the thread for selection.
    */
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index acf96e8..a3897c0 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -65,7 +65,7 @@ class SocketServerTest extends JUnitSuite {
   server.startup()
   val sockets = new ArrayBuffer[Socket]
 
-  def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) {
+  def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true) {
     val outgoing = new DataOutputStream(socket.getOutputStream)
     id match {
       case Some(id) =>
@@ -75,7 +75,8 @@ class SocketServerTest extends JUnitSuite {
         outgoing.writeInt(request.length)
     }
     outgoing.write(request)
-    outgoing.flush()
+    if (flush)
+      outgoing.flush()
   }
 
   def receiveResponse(socket: Socket): Array[Byte] = {
@@ -86,10 +87,15 @@ class SocketServerTest extends JUnitSuite {
     response
   }
 
+  private def receiveRequest(channel: RequestChannel, timeout: Long = 2000L): RequestChannel.Request = {
+    val request = channel.receiveRequest(timeout)
+    assertNotNull("receiveRequest timed out", request)
+    request
+  }
+
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
-    val request = channel.receiveRequest(2000)
-    assertNotNull("receiveRequest timed out", request)
+    val request = receiveRequest(channel)
     processRequest(channel, request)
   }
 
@@ -115,12 +121,11 @@ class SocketServerTest extends JUnitSuite {
     sockets.clear()
   }
 
-  private def producerRequestBytes: Array[Byte] = {
+  private def producerRequestBytes(ack: Short = 0): Array[Byte] = {
     val apiKey: Short = 0
     val correlationId = -1
     val clientId = ""
     val ackTimeoutMs = 10000
-    val ack = 0: Short
 
     val emptyRequest = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, ack, ackTimeoutMs,
       new HashMap[TopicPartition, MemoryRecords]()).build()
@@ -133,11 +138,30 @@ class SocketServerTest extends JUnitSuite {
     serializedBytes
   }
 
+  private def sendRequestsUntilStagedReceive(server: SocketServer, socket: Socket, requestBytes: Array[Byte]): RequestChannel.Request = {
+    def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
+      sendRequest(socket, requestBytes, flush = false)
+      sendRequest(socket, requestBytes, flush = true)
+      receiveRequest(server.requestChannel)
+    }
+    val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
+      val connectionId = req.connectionId
+      val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0
+      if (!hasStagedReceives) {
+        processRequest(server.requestChannel, req)
+        processRequest(server.requestChannel)
+      }
+      hasStagedReceives
+    }
+    assertTrue(s"Receives not staged for ${org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS} ms", hasStagedReceives)
+    request
+  }
+
   @Test
   def simpleRequest() {
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
     val traceSocket = connect(protocol = SecurityProtocol.TRACE)
-    val serializedBytes = producerRequestBytes
+    val serializedBytes = producerRequestBytes()
 
     // Test PLAINTEXT socket
     sendRequest(plainSocket, serializedBytes)
@@ -171,7 +195,7 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testGracefulClose() {
     val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
-    val serializedBytes = producerRequestBytes
+    val serializedBytes = producerRequestBytes()
 
     for (_ <- 0 until 10)
       sendRequest(plainSocket, serializedBytes)
@@ -236,7 +260,7 @@ class SocketServerTest extends JUnitSuite {
     TestUtils.waitUntilTrue(() => server.connectionCount(address) < conns.length,
       "Failed to decrement connection count after close")
     val conn2 = connect()
-    val serializedBytes = producerRequestBytes
+    val serializedBytes = producerRequestBytes()
     sendRequest(conn2, serializedBytes)
     val request = server.requestChannel.receiveRequest(2000)
     assertNotNull(request)
@@ -255,7 +279,7 @@ class SocketServerTest extends JUnitSuite {
       val conns = (0 until overrideNum).map(_ => connect(overrideServer))
 
       // it should succeed
-      val serializedBytes = producerRequestBytes
+      val serializedBytes = producerRequestBytes()
       sendRequest(conns.last, serializedBytes)
       val request = overrideServer.requestChannel.receiveRequest(2000)
       assertNotNull(request)
@@ -341,7 +365,7 @@ class SocketServerTest extends JUnitSuite {
     try {
       overrideServer.startup()
       conn = connect(overrideServer)
-      val serializedBytes = producerRequestBytes
+      val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
 
       val channel = overrideServer.requestChannel
@@ -367,6 +391,26 @@ class SocketServerTest extends JUnitSuite {
     }
   }
 
+  @Test
+  def testClientDisconnectionWithStagedReceivesFullyProcessed() {
+    val socket = connect(server)
+
+    // Setup channel to client with staged receives so when client disconnects
+    // it will be stored in Selector.closingChannels
+    val serializedBytes = producerRequestBytes(1)
+    val request = sendRequestsUntilStagedReceive(server, socket, serializedBytes)
+    val connectionId = request.connectionId
+
+    // Set SoLinger to 0 to force a hard disconnect via TCP RST
+    socket.setSoLinger(true, 0)
+    socket.close()
+
+    // Complete request with socket exception so that the channel is removed from Selector.closingChannels
+    processRequest(server.requestChannel, request)
+    TestUtils.waitUntilTrue(() => server.processor(0).openOrClosingChannel(connectionId).isEmpty,
+      "Channel not closed after failed send")
+  }
+
   /*
    * Test that we update request metrics if the channel has been removed from the selector when the broker calls
    * `selector.send` (selector closes old connections, for example).
@@ -381,7 +425,7 @@ class SocketServerTest extends JUnitSuite {
     try {
       overrideServer.startup()
       conn = connect(overrideServer)
-      val serializedBytes = producerRequestBytes
+      val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
       val channel = overrideServer.requestChannel
       val request = channel.receiveRequest(2000)

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.