You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/18 00:02:02 UTC

[GitHub] [kafka] badaiaqrandista opened a new pull request, #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

badaiaqrandista opened a new pull request, #12416:
URL: https://github.com/apache/kafka/pull/12416

   KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
   
   Processing request got delayed by 300 ms in the following condition:
   1. Client-Server communication uses SSL socket
   2. More than one requests are in the same network packet
   
   This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that leads to this situation is the following (high level):
   
   Step 1 - Client sends more than one requests in the same network packet.
   Step 2 - Server processes the 1st request. While doing this, SslTransportLayer reads all of the bytes (containing multiple requests) from the socket and stores it in the buffer.
   Step 3 - Server sends the response for the 1st request.
   Step 4 - THIS IS WHERE THE DELAY IS. Server processes the 2nd request. This request is taken from the SslTransportLayer buffer, instead of the socket. Because of this, "select(timeout)" blocks for 300 ms.
   
   To fix this, Selector set "madeReadProgressLastPoll" to "true" after sending response, if there's data in the buffer.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r943043296


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,40 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Test to ensure "Selector.poll()" does not block at "select(timeout)" when there is no data in the socket but there
+   * is data in the buffer. This only happens when SSL protocol is used.
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    props ++= sslServerProps
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeoutMs = 5000
+    // set pollTimeoutOverride to "selectTimeoutMs" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeoutMs)
+
+    try {
+      val (sslSocket, req1) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
+
+      // process the request to trigger SSL handshake
+      processRequest(testableServer.dataPlaneRequestChannel, req1)

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927148782


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   The `Selector` method getting called after channel is unmuted is actually `clearCompletedSends()` ([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L825-L827)).
   
   Few alternatives I can think of:
   
   1. Move `madeReadProgressLastPoll=true` to this method (`Selector.clearCompletedSends()`).
   2. Create a new `Selector` method named `readyForNextRead()` (or something) and call it from `SocketServer.processCompletedSends()`.
   
   But these alternatives will require looping over all the channels, which will not be efficient.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927098320


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.

Review Comment:
   Got it. Changing "300 ms" to "poll timeout".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927095680


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   How about saying this in the comment:
   
   
   > After reading from the channel, "madeReadProgressLastPoll" was set to "true". However, because channel is muted while processing the previous request, "madeReadProgressLastPoll" was set to "false" by "clear()". Now that a response has been sent and channel is unmuted, we need to set "madeReadProgressLastPoll" back to "true" if there is additional data in the buffer, to ensure we process it immediately.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r942768273


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,40 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Test to ensure "Selector.poll()" does not block at "select(timeout)" when there is no data in the socket but there
+   * is data in the buffer. This only happens when SSL protocol is used.
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    props ++= sslServerProps
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeoutMs = 5000
+    // set pollTimeoutOverride to "selectTimeoutMs" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeoutMs)
+
+    try {
+      val (sslSocket, req1) = makeSocketWithBufferedRequests(testableServer, testableSelector, proxyServer)
+
+      // process the request to trigger SSL handshake
+      processRequest(testableServer.dataPlaneRequestChannel, req1)

Review Comment:
   nit: this comment is incorrect - the SSL handshake is triggered as part of `makeSocketWithBufferedRequests`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r924908020


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    //       dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      processRequest(testableServer.dataPlaneRequestChannel)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will take more than 300 ms
+      val processTimeStart = System.currentTimeMillis()
+      processRequest(testableServer.dataPlaneRequestChannel)
+      val processTimeEnd = System.currentTimeMillis()
+
+      // receive response in the client side
+      receiveResponse(sslSocket)

Review Comment:
   @splett2 Can you refresh the PR? I have added timeout override and changed currentTimeMillis to nanoTime. So this comment is against an older version of the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on PR #12416:
URL: https://github.com/apache/kafka/pull/12416#issuecomment-1214907290

   Test failures not related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r923930734


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    //       dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      processRequest(testableServer.dataPlaneRequestChannel)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will take more than 300 ms
+      val processTimeStart = System.currentTimeMillis()
+      processRequest(testableServer.dataPlaneRequestChannel)
+      val processTimeEnd = System.currentTimeMillis()
+
+      // receive response in the client side
+      receiveResponse(sslSocket)

Review Comment:
   I'm trying to think about how we can get stronger sequencing guarantees here.
   
   Maybe we can do something like explicitly set the testable selector's pollTimeoutOverride to some large value and explicitly call wakeup once we have the right state setup.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    //       dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]

Review Comment:
   nit: remove this 



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)

Review Comment:
   nit: this seems extraneous - we can drop it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r923947288


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException {
             if (send != null) {
                 this.completedSends.add(send);
                 this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
+
+                // To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms when the socket has no data but the buffer
+                // has data. Only happens when using SSL.
+                if (channel.hasBytesBuffered())
+                    madeReadProgressLastPoll = true;

Review Comment:
   @ijuma Should I move this to `pollSelectionKeys` after calling `attemptWrite` (see below link)?
   
   https://github.com/apache/kafka/blob/f2242f44fdee9b5575c8f359bacb0e5f457ffd42/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L593
   
   I have tested that this would also work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927095680


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   How about saying this in the comment:
   
   ```
   After reading from the channel, "madeReadProgressLastPoll" was set to "true". However, because channel is muted while processing the previous request, "madeReadProgressLastPoll" was set to "false" by "clear()". Now that a response has been sent and channel is unmuted, we need to set "madeReadProgressLastPoll" back to "true" if there is additional data in the buffer, to ensure we process it immediately.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r922975981


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException {
             if (send != null) {
                 this.completedSends.add(send);
                 this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
+
+                // To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms when the socket has no data but the buffer
+                // has data. Only happens when using SSL.
+                if (channel.hasBytesBuffered())
+                    madeReadProgressLastPoll = true;

Review Comment:
   It's weird to change this read related variable after we have done a write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927123903


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)
+      val processTimeEnd = System.nanoTime()
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // check the duration of processing the second request
+      val processTimeDuration = (processTimeEnd - processTimeStart) / 1000000.0  // in ms
+      assertTrue(processTimeDuration < selectTimeout,
+        "Time to process the second request (" + processTimeDuration + " ms) should be under " + selectTimeout + " ms")
+

Review Comment:
   I assume you are referring to `receiveResponse()` call. I have removed it as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r925034851


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException {
             if (send != null) {
                 this.completedSends.add(send);
                 this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
+
+                // To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms when the socket has no data but the buffer
+                // has data. Only happens when using SSL.
+                if (channel.hasBytesBuffered())
+                    madeReadProgressLastPoll = true;

Review Comment:
   PR updated.. so I'll resolve this comment..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r928627267


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:

Review Comment:
   We don't usually add references to tickets in comments. Can we rewrite this comment to say what it is testing and move the explanation of the bug into the PR description that will be included in the commit message?



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener

Review Comment:
   Unnecessary comment



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms

Review Comment:
   Call this selectTimeoutMs and remove comment?



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time

Review Comment:
   `processTimeStartNanos` and `processTimeEndNanos` since we are converting later



##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @badaiaqrandista Couldn't we just set madeReadProgressLastPoll=true in `unmute(KafkaChannel channel)` when we add the channel to keysWithBufferedRead? That is the case we are trying to handle here right? It would be more obvious why, without having to rely on comments.
   
   ```
   if (channel.hasBytesBuffered()) {
        keysWithBufferedRead.add(channel.selectionKey());
        madeReadProgressLastPoll = true;
   }
   ```



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      val processTimeEnd = System.nanoTime()
+
+      // check the duration of processing the second request
+      val processTimeDuration = (processTimeEnd - processTimeStart) / 1000000.0  // in ms

Review Comment:
   Use TimeUnit.NANOSECONDS.toMillis, comment unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r925034172


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    //       dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]

Review Comment:
   done.. PR updated..



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)

Review Comment:
   done.. PR updated..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929317899


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener

Review Comment:
   done



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929322796


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927123562


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)

Review Comment:
   `receiveRequest()` actually only calls `requestQueue.poll()` underneath. If I set the timeout of the second `receiveRequest()` to 1s, it always fails because the request getting into `requestQueue` relies on `Selector.poll()` completing the read from the channel, which is blocked by the `select(timeout)` call in `Selector.java`. So the timeout here has to be `selectTimeout + 1000`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r925034643


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException {
             if (send != null) {
                 this.completedSends.add(send);
                 this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
+
+                // To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms when the socket has no data but the buffer
+                // has data. Only happens when using SSL.
+                if (channel.hasBytesBuffered())
+                    madeReadProgressLastPoll = true;

Review Comment:
   @ijuma I have moved the fix to `pollSelectionKeys` as I mentioned before to make it less weird (hopefully).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927148782


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   The `Selector` method getting called after channel is unmuted is actually `clearCompletedSends()` ([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L825-L827)).
   
   Few alternatives I can think of:
   
   1. Move `madeReadProgressLastPoll=true` to this method (`Selector.clearCompletedSends()`).
   2. Create a new `Selector` method named `readyForNextRead()` (or something) and call it from `SocketServer.processCompletedSends()`.
   3. Not setting `madeReadProgressLastPoll=false` if `channel.hasBytesBuffered()` is true in `clear()` ([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L862)).
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927148782


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   The `Selector` method getting called after channel is unmuted is actually `clearCompletedSends()` ([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L825-L827)).
   
   Few alternatives I can think of:
   
   1. Move `madeReadProgressLastPoll=true` to this method (`Selector.clearCompletedSends()`).
   2. Create a new `Selector` method named `readyForNextRead()` (or something) and call it from `SocketServer.processCompletedSends()`.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929319425


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      val processTimeEnd = System.nanoTime()
+
+      // check the duration of processing the second request
+      val processTimeDuration = (processTimeEnd - processTimeStart) / 1000000.0  // in ms

Review Comment:
   done



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,93 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in "Selector.poll()"
+   * should not block for the "poll timeout" (hardcoded to 300 in Selector.java, but in this test it is set to 5000).
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927123679


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)

Review Comment:
   Got it. Removing this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on PR #12416:
URL: https://github.com/apache/kafka/pull/12416#issuecomment-1214447770

   @rajinisivaram With the help from @splett2 , the test is not failing anymore. Can you please have a look again when you're available? Thank you!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram merged pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged PR #12416:
URL: https://github.com/apache/kafka/pull/12416


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r926147113


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.

Review Comment:
   This comment is written largely in the context of the JIRA ticket. I think we can write it more in the context of the current code, eg: replace references to `300 ms` with references to `poll timeout` so that if the poll timeout is changed, it is still consistent.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)

Review Comment:
   We can just call `receiveRequest` with a relatively small timeoutOverride (something like 1s). If we don't receive the request in 1s `receiveRequest` will throw an exception - which roughly captures what we're looking to test.



##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   This is probably not the right place for this logic - we only want to set the flag if we completed a send and updated the mute state for a channel.
   
   @ijuma's comment was probably more around the naming being unintuitive - we were previously setting `madeReadProgressLastPoll` when completing a Send.
   
   We probably need a clarifying comment around why completing a `Send` is making read progress - there is an eligible channel to read from that previously wasn't available. 
   
   Alternatively, we can update the variable name to something that captures the scenario we're trying to cover (or add a new variable). Maybe something like `addedNewBufferedKeysLastPoll`. It's not very succinct but I've always been terrible at variable naming.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)

Review Comment:
   there's not really any need to process the request - as long as the request layer receives it within a reasonable amount of time, we are good.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,98 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+    testableServer.enableRequestProcessing(Map.empty)
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+    val selectTimeout = 5000  // in ms
+    // set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is distinct and can be identified
+    testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      // we call wakeup() so Selector.poll() does not block in this step (because we artificially add data into netReadBuffer)
+      testableSelector.wakeup()
+      val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE 1: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will block for 300 ms (in this test, we override poll() timeout to 5000 ms to make it distinct)
+      // NOTE 2: we do not call wakeup() here so Selector.poll() would block if the fix is not in place
+      val processTimeStart = System.nanoTime()  // using nanoTime() because it is meant to calculate elapsed time
+      val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, selectTimeout + 1000)
+      processRequest(testableServer.dataPlaneRequestChannel, req2)
+      val processTimeEnd = System.nanoTime()
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // check the duration of processing the second request
+      val processTimeDuration = (processTimeEnd - processTimeStart) / 1000000.0  // in ms
+      assertTrue(processTimeDuration < selectTimeout,
+        "Time to process the second request (" + processTimeDuration + " ms) should be under " + selectTimeout + " ms")
+

Review Comment:
   likewise, we can probably remove this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r929297029


##########
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##########
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                 long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
                 try {
                     attemptWrite(key, channel, nowNanos);
+
+                    // Following is to fix KAFKA-13559. This will prevent poll() from blocking for 300 ms when the
+                    // socket has no data but the buffer has data. Only happens when using SSL.
+                    if (channel.hasBytesBuffered())
+                        madeReadProgressLastPoll = true;

Review Comment:
   Thank you. You are right. And the test passed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rajinisivaram commented on pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on PR #12416:
URL: https://github.com/apache/kafka/pull/12416#issuecomment-1195238184

   @badaiaqrandista Thanks for the update, looks good. But looks like there is a timing issue in the test since it has failed for the JDK8 PR build, can you take a look? Remember seeing it yesterday before the changes as well, so maybe a timing issue with the test itself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org