You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/16 01:21:24 UTC

[GitHub] [kafka] functioner opened a new pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

functioner opened a new pull request #11504:
URL: https://github.com/apache/kafka/pull/11504


   A patch for [KAFKA-13457](https://issues.apache.org/jira/browse/KAFKA-13457)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753652583



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       You're right. I overlooked this. In this case, I wonder if we could test the Acceptor only. Would it work? That would simplify all of this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r754898485



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")

Review comment:
       Sorry, I was not clear here. Yes, we do need `overrideProps`. However, I think that we don't need to set `MaxConnectionsPerIpOverridesProp`, do we? If not, we could remove it and also remove `overrideNum`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r750266430



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       We might have to use `close(endPoint.listenerName, socketChannel)` here. Otherwise, we would not decrement the number of connections in the `connectionQuotas`. Could you double check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r750497406



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       @dajac You're right. I've updated the JIRA issue descriptions. I have a new commit using `close(endPoint.listenerName, socketChannel)`.
   For the test, since it involves injecting an IOException, I'm going to use Byteman or AspectJ. Or do you have any suggestion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r751600464



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       It seems doable. I'm looking for a unit test in `core/src/test/scala/unit/kafka/network/SocketServerTest.scala` based on which I can modify & write the test. Do you have any suggestion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on pull request #11504: KAFKA-13457: SocketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#issuecomment-978812709


   Thanks everybody here for the suggestions and merging the patch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#issuecomment-975322642


   @functioner It seems that the code does not compile. I seems to be due to https://github.com/apache/kafka/commit/30a9085d505ac301acee18e4cdcc98d8060e0388 which added an parameter to the `Acceptor`'s constructor. Could you rebase or merge trunk into your branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753652583



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       You're right. I overlooked this. In this case, I wonder if we could test the Acceptor only. Would it work? That would simplify all of this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r754482445



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")
+    val serverMetrics = new Metrics()
+
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
+      Time.SYSTEM, credentialProvider, apiVersionManager) {
+
+      // same as SocketServer.createAcceptor,
+      // except the Acceptor overriding a method to inject the exception
+      override protected def createAcceptor(endPoint: EndPoint, metricPrefix: String): Acceptor = {
+        val sendBufferSize = config.socketSendBufferBytes
+        val recvBufferSize = config.socketReceiveBufferBytes
+        new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) {
+          override protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = {
+            throw new IOException("test injected IOException")
+          }
+        }
+      }
+    }
+
+    try {
+      overrideServer.startup()
+      val conn = connect(overrideServer)
+      conn.setSoTimeout(3000)
+      assertEquals(-1, conn.getInputStream.read())

Review comment:
       Right. I've pushed a commit. I check if the quota is 1 after the accept, and if the quota is 0 after the exception thrown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r754481036



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,9 +730,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encountered IOException, closing connection", e)

Review comment:
       Right. I've pushed a commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753548985



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       There are multiple classes defined in `core/src/main/scala/kafka/network/SocketServer.scala`. The `accept` method belongs to `Acceptor`, rather than `SocketServer`, so we can't override `accept` the way you show. We need to override `accept` in `Acceptor`, and then use this `Acceptor` when the `SocketServer` is initialized. Therefore, we need to first override `createAcceptor` in `SocketServer`, and then override `accept` in `Acceptor`. In addition, we need to change `createAcceptor` & `accept` from private to protected; we also need to change some fields from private to protected.
   
   I've pushed a new commit implementing a test using this idea. I've confirmed that this test will pass only when the IOException is properly handled as I pointed out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11504: KAFKA-13457: SocketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11504:
URL: https://github.com/apache/kafka/pull/11504


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r750538124



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       I think Byteman is widely used in many other communities, because it's a very elegant way to do fault injection testing. For example, ZooKeeper community has been discussing it in https://github.com/apache/zookeeper/pull/123. I think adding Byteman dependency won't take much time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r751600464



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       It seems doable. I'm looking for a unit test in `core/src/test/scala/unit/kafka/network/SocketServerTest.scala` based on which I can modify & write the test. Do you have any suggestion? It seems we also need to modify lots of code in `core/src/main/scala/kafka/network/SocketServer.scala` to enable this test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753548985



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       There are multiple classes defined in `core/src/main/scala/kafka/network/SocketServer.scala`. The `accept` method belongs to `Acceptor`, rather than `SocketServer`, so we can't override `accept` the way you show. We need to override `accept` in `Acceptor`, and then use this `Acceptor` when the `SocketServer` is initialized. It seems none of the tests in `core/src/test/scala/unit/kafka/network/SocketServerTest.scala` have this kind of behavior. `Acceptor` appears multiple times within `SocketServer`, in multiple forms. So it seems we have to modify some code in `SocketServer` to enable our new `Acceptor`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r754464724



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")

Review comment:
       We need the `overrideProps` to create the `overrideServer`. From the other tests, I didn't figure out what could be a better configuration, so I just use the configuration in `testMaxConnectionsPerIpOverrides`. Or maybe we can try the configuration in `testConnectionRatePerIp`. Do you have any suggestion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r750498357



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)

Review comment:
       @showuon Thanks for the comment. I've updated the info message with the new commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753687329



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       Since the existing test cases don't have scaffold for testing `Acceptor`, we need to prepare lots of parameters for `Acceptor` constructor
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L550-L558
   We also need to prepare lots of parameters for creating a `Processor` for the `Acceptor`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L687-L703
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L574-L578
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L790-L806
   However, those logic are implemented in `SocketServer`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L268-L283
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L424-L445
   Therefore, I think testing Acceptor requires much more code. Instead, testing `SocketServer` is simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r751111771



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       Another way would be to extract `serverSocketChannel.accept()` into a method. Then we could override that method in a test and return a mocked socket which throws an exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r750532169



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       Hmm... Good question. I don't think that we already use those library in the code base so it would be great if we could do it with what we already use. I am not sure if it is possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r749936830



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)

Review comment:
       nit: `Encountered IOException. Closing connection.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r752997477



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       `testMaxConnectionsPerIpOverrides` might be a good starting point as it test something in the same area. Why do you think that we need to modify a log of code in `SocketServer`?
   
   If we extract `serverSocketChannel.accept()` into a method. Then, we can create a `SocketServer` and overwrite the method. In the method, we could return a mocked Socket which throws an exception when `configureBlocking` is called. Then, we could try to establish a connection. Finally, we can verify that the `close` methods were called.
   
   ```scala
   val overrideServer = new SocketServer(...) {
     override def accept(): Socket = {
   
     }
   }
   ```
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753548985



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       There are multiple classes defined in `core/src/main/scala/kafka/network/SocketServer.scala`. The `accept` method belongs to `Acceptor`, rather than `SocketServer`, so we can't override `accept` the way you show. We need to override `accept` in `Acceptor`, and then use this `Acceptor` when the `SocketServer` is initialized. Therefore, we need to first override `createAcceptor` in `SocketServer`, and then override `accept` in `Acceptor`. In addition, we need to change `createAcceptor` & `accept` from private to protected; we also need to change some fields from private to protected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753687329



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       Since the existing test cases don't have scaffold for testing `Acceptor`, we need to prepare lots of parameters for `Acceptor` constructor
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L550-L558
   We also need to prepare a `Processor` for the `Acceptor`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L687-L703
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L574-L578
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L790-L806
   However, those logic are implemented in `SocketServer`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L268-L283
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L424-L445
   Therefore, I think testing Acceptor requires much more code. Instead, testing `SocketServer` is simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r749936830



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)

Review comment:
       nit: `Encountered IOException, closing connection.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r754109304



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")

Review comment:
       I guess that we don't need this in the context of this test, do we?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,9 +730,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encountered IOException, closing connection", e)

Review comment:
       Should we log this as an error? Moreover, I would rephrase a bit the log: `Encountered an error while configuring the connection, closing it.`

##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")
+    val serverMetrics = new Metrics()
+
+    val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
+      Time.SYSTEM, credentialProvider, apiVersionManager) {
+
+      // same as SocketServer.createAcceptor,
+      // except the Acceptor overriding a method to inject the exception
+      override protected def createAcceptor(endPoint: EndPoint, metricPrefix: String): Acceptor = {
+        val sendBufferSize = config.socketSendBufferBytes
+        val recvBufferSize = config.socketReceiveBufferBytes
+        new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time) {
+          override protected def configureAcceptedSocketChannel(socketChannel: SocketChannel): Unit = {
+            throw new IOException("test injected IOException")
+          }
+        }
+      }
+    }
+
+    try {
+      overrideServer.startup()
+      val conn = connect(overrideServer)
+      conn.setSoTimeout(3000)
+      assertEquals(-1, conn.getInputStream.read())

Review comment:
       Could we also verify that the connection quota is correct? We can use `connectionQuotas.get` for this with the ip of the relevant listener.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753548985



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       There are multiple classes defined in `core/src/main/scala/kafka/network/SocketServer.scala`. The `accept` method belongs to `Acceptor`, rather than `SocketServer`, so we can't override `accept` the way you show. We need to override `accept` in `Acceptor`, and then use this `Acceptor` when the `SocketServer` is initialized. Therefore, we need to first override `createAcceptor` in `SocketServer`, and then override `accept` in `Acceptor`. In addition, we need to change `createAcceptor` & `accept` from private to protected; we also need to change some fields from private to protected.
   
   I've pushed a new commit implementing the test using this idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#issuecomment-974893291


   Are there any other issues or concern with the current patch and test? If any, I will resolve ASAP. Will this PR be merged soon by any chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r753687329



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       Since the existing test cases don't have scaffold for testing `Acceptor`, we need to prepare lots of parameters for `Acceptor` constructor
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L550-L558
   We also need to prepare a `Processor` for the `Acceptor`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L687-L703
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L574-L578
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L790-L806
   However, those logic are implemented in `SocketServer`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L268-L283
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L424-L445
   Therefore, I think testing Acceptor requires much more code. Instead, testing `SocketServer` is simpler.

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -734,6 +734,10 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
         val endThrottleTimeMs = e.startThrottleTimeMs + e.throttleTimeMs
         throttledSockets += DelayedCloseSocket(socketChannel, endThrottleTimeMs)
         None
+      case e: IOException =>
+        info(s"Encounter IOException", e)
+        closeSocket(socketChannel)

Review comment:
       Since the existing test cases don't have scaffold for testing `Acceptor`, we need to prepare lots of parameters for `Acceptor` constructor
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L550-L558
   We also need to prepare lots of parameters for creating a `Processor` for the `Acceptor`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L687-L703
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L574-L578
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L790-L806
   However, those logic are implemented in `SocketServer`
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L268-L283
   https://github.com/apache/kafka/blob/074a03cca162f91ccdecc12eb84c6a45af75f6bf/core/src/main/scala/kafka/network/SocketServer.scala#L424-L445
   Therefore, I think testing Acceptor requires much more code. Instead, testing `SocketServer` is simpler.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] functioner commented on a change in pull request #11504: KAFKA-13457: socketChannel in Acceptor#accept is not closed upon IOException

Posted by GitBox <gi...@apache.org>.
functioner commented on a change in pull request #11504:
URL: https://github.com/apache/kafka/pull/11504#discussion_r755134052



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -867,6 +868,39 @@ class SocketServerTest {
     }
   }
 
+  @Test
+  def testExceptionInAcceptor(): Unit = {
+    val overrideNum = server.config.maxConnectionsPerIp + 1
+    val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
+    overrideProps.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$overrideNum")

Review comment:
       You're right. I've pushed a new commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org