You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/19 14:59:10 UTC

[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r923930734


##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    //       dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+    val testableSelector = testableServer.testableSelector
+    val proxyServer = new ProxyServer(testableServer)
+
+    try {
+      // trigger SSL handshake by sending the first request and receiving its response without buffering
+      val requestBytes = producerRequestBytes()
+      val sslSocket = sslClientSocket(proxyServer.localPort)
+
+      sendRequest(sslSocket, requestBytes)
+      val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+      processRequest(testableServer.dataPlaneRequestChannel, request1)
+      receiveResponse(sslSocket)
+
+      // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer
+      val connectionId = request1.context.connectionId
+      val listener = testableServer.config.dataPlaneListeners.head.listenerName.value
+      val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found"))
+      val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer")
+      val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer")
+
+      proxyServer.enableBuffering(netReadBuffer)
+      sendRequest(sslSocket, requestBytes)
+      sendRequest(sslSocket, requestBytes)
+
+      val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead")
+      keysWithBufferedRead.add(channel.selectionKey)
+      JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+      // process the first request in the server side
+      // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request
+      processRequest(testableServer.dataPlaneRequestChannel)
+
+      // receive response in the client side
+      receiveResponse(sslSocket)
+
+      // process the second request in the server side
+      // this would process the second request in the appReadBuffer
+      // NOTE: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559,
+      // this step will take more than 300 ms
+      val processTimeStart = System.currentTimeMillis()
+      processRequest(testableServer.dataPlaneRequestChannel)
+      val processTimeEnd = System.currentTimeMillis()
+
+      // receive response in the client side
+      receiveResponse(sslSocket)

Review Comment:
   I'm trying to think about how we can get stronger sequencing guarantees here.
   
   Maybe we can do something like explicitly set the testable selector's pollTimeoutOverride to some large value and explicitly call wakeup once we have the right state setup.



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)
+
+    // create server with SSL listener
+    val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+    testableServer.enableRequestProcessing(Map.empty)
+    //       dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]

Review Comment:
   nit: remove this 



##########
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##########
@@ -1878,6 +1878,97 @@ class SocketServerTest {
     }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer
+   *          (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+    shutdownServerAndMetrics(server)
+
+    // to ensure we only have 1 connection (channel)
+    val props = sslServerProps
+    val numConnections = 1
+    props.put("max.connections.per.ip", numConnections.toString)

Review Comment:
   nit: this seems extraneous - we can drop it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org