You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2022/08/15 11:34:26 UTC

[kafka] branch trunk updated: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d529d86aa4b KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416)
d529d86aa4b is described below

commit d529d86aa4be533d1251cfc0b4c0fb57c69ace72
Author: Badai Aqrandista <ba...@yahoo.com>
AuthorDate: Mon Aug 15 21:34:03 2022 +1000

    KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle. (#12416)
    
    Ensures that SSL buffered data is processed by server immediately on the next poll when channel is unmuted after processing previous request. Poll timeout is reset to zero for this case to avoid 300ms delay in poll() if no new data arrives on the sockets.
    
    Reviewers: David Mao <dm...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Rajini Sivaram <ra...@googlemail.com>
---
 .../org/apache/kafka/common/network/Selector.java  |  1 +
 .../unit/kafka/network/SocketServerTest.scala      | 44 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index bd1175a8ee0..2e581187625 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -757,6 +757,7 @@ public class Selector implements Selectable, AutoCloseable {
             explicitlyMutedChannels.remove(channel);
             if (channel.hasBytesBuffered()) {
                 keysWithBufferedRead.add(channel.selectionKey());
+                madeReadProgressLastPoll = true;
             }
         }
     }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 98f92d61ff2..801a2d83cab 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -1878,6 +1878,44 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Test to ensure "Selector.poll()" does not block at "select(timeout)" when there is no data in the socket but there
+   * is data in the buffer. This only happens when SSL protocol is used.
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    props ++= sslServerProps
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeoutMs = 5000
+    // set pollTimeoutOverride to "selectTimeoutMs" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeoutMs)
+
+    try {
+      // initiate SSL connection by sending 1 request via socket, then send 2 requests directly into the netReadBuffer
+      val (sslSocket, req1) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
+
+      // force all data to be transferred to the kafka broker by closing the client connection to proxy server
+      sslSocket.close()
+      TestUtils.waitUntilTrue(() => proxyServer.clientConnSocket.isClosed, "proxyServer.clientConnSocket is still not closed after 60000 ms", 60000)
+
+      // process the request and send the response
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // process the requests in the netReadBuffer, this should not block
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)
+
+    } finally {
+      proxyServer.close()
+      shutdownServerAndMetrics(testableServer)
+    }
+  }
+
   private def sslServerProps: Properties = {
     val trustStoreFile = File.createTempFile("truststore", ".jks")
     val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
@@ -2044,10 +2082,12 @@ class SocketServerTest {
     }
 
     def testableSelector: TestableSelector =
-      dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+      testableProcessor.selector.asInstanceOf[TestableSelector]
 
-    def testableProcessor: TestableProcessor =
+    def testableProcessor: TestableProcessor = {
+      val endpoint = this.config.dataPlaneListeners.head
       dataPlaneAcceptors.get(endpoint).processors(0).asInstanceOf[TestableProcessor]
+    }
 
     def waitForChannelClose(connectionId: String, locallyClosed: Boolean): Unit = {
       val selector = testableSelector