You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/21 10:57:36 UTC

[GitHub] [kafka] rajinisivaram opened a new pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

rajinisivaram opened a new pull request #8705:
URL: https://github.com/apache/kafka/pull/8705


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431862931



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1861,20 +1872,33 @@ class SocketServerTest {
 
     override def poll(timeout: Long): Unit = {
       try {
+        assertEquals(0, super.completedReceives().size)
+        assertEquals(0, super.completedSends().size)
+
         pollCallback.apply()
         while (!pendingClosingChannels.isEmpty) {
           makeClosing(pendingClosingChannels.poll())
         }
-        allCachedPollData.foreach(_.reset)
         runOp(SelectorOperation.Poll, None) {
           super.poll(pollTimeoutOverride.getOrElse(timeout))
         }
       } finally {
         super.channels.forEach(allChannels += _.id)
         allDisconnectedChannels ++= super.disconnected.asScala.keys
-        cachedCompletedReceives.update(super.completedReceives.asScala.toBuffer)
-        cachedCompletedSends.update(super.completedSends.asScala)
-        cachedDisconnected.update(super.disconnected.asScala.toBuffer)
+
+        val map: util.Map[String, NetworkReceive] = JTestUtils.fieldValue(this, classOf[Selector], "completedReceives")

Review comment:
       @ijuma Thanks for the review, updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-632241769


   @chia7712 I was tempted to do that initially, but that is not the pattern we use for everything else in Selector and it has always been this way (for several years), so adding tests to make sure we don't break it made more sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634696095


   This change is probably OK, but the way we call `close` while iterating over `completedReceives` seems a bit fragile. It would probably be safer to collect the items we need to close and close them in a separate iteration. What do you think @rajinisivaram?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431581763



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1807,6 +1817,7 @@ class SocketServerTest {
           currentPollValues ++= newValues
         } else
           deferredValues ++= newValues
+        newValues.clear()

Review comment:
       What is the goal of this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-635224994


   @ijuma Thanks for the review, have addressed the comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-633725248


   @rajinisivaram What's the implication of not removing the completed receive in `doClose`?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-635846684


   @ijuma @chia7712 Thanks for the reviews, merging to trunk and 2.5.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r429100323



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1742,6 +1746,12 @@ class SocketServerTest {
            selector = Some(testableSelector)
            testableSelector
         }
+
+        override private[network] def processException(errorMessage: String, throwable: Throwable, isUncaught: Boolean): Unit = {

Review comment:
       ```isUncaught``` is used by testing only so it is a bit awkward to production code. Could you check the ```errorMessage``` instead of adding new argument? for example:
   
   ```scala
   if (errorMessage == "Processor got uncaught exception.") uncaughtExceptions += 1
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r432209283



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1886,19 +1926,13 @@ class SocketServerTest {
         super.channels.forEach(allChannels += _.id)
         allDisconnectedChannels ++= super.disconnected.asScala.keys
 
-        val completedReceivesMap: util.Map[String, NetworkReceive] = JTestUtils.fieldValue(this, classOf[Selector], "completedReceives")
-        def addToCompletedReceives(receive: NetworkReceive): Unit = {
-          val channelOpt = Option(super.channel(receive.source)).orElse(Option(super.closingChannel(receive.source)))
-          channelOpt.foreach { channel => completedReceivesMap.put(channel.id, receive) }
-        }
-
         // For each result type (completedReceives/completedSends/disconnected), defer the result to a subsequent poll()
         // if `minPerPoll` results are not yet available. When sufficient results are available, all available results
         // including previously deferred results are returned. This allows tests to process `minPerPoll` elements as the
         // results of a single poll iteration.

Review comment:
       I think your refactoring has added the comments to other places. Maybe we can add "This allows tests to process `minPerPoll` elements as the results of a single poll iteration" to the `update` method and remove this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r430299529



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1742,6 +1746,12 @@ class SocketServerTest {
            selector = Some(testableSelector)
            testableSelector
         }
+
+        override private[network] def processException(errorMessage: String, throwable: Throwable, isUncaught: Boolean): Unit = {

Review comment:
       Funny you should say that, I was initially checking the error message and then felt that the test wouldn't fail if the error message was changed. But since then I had also updated the test which triggers the uncaught exception code path, so it is actually safe now to check the error message. Have updated the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431697371



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1875,6 +1889,14 @@ class SocketServerTest {
         cachedCompletedReceives.update(super.completedReceives.asScala.toBuffer)
         cachedCompletedSends.update(super.completedSends.asScala)
         cachedDisconnected.update(super.disconnected.asScala.toBuffer)
+
+        val map: util.Map[String, NetworkReceive] = JTestUtils.fieldValue(this, classOf[Selector], "completedReceives")
+        cachedCompletedReceives.currentPollValues.foreach { receive =>
+          val channelOpt = Option(super.channel(receive.source)).orElse(Option(super.closingChannel(receive.source)))
+          channelOpt.foreach { channel => map.put(channel.id, receive) }
+        }
+        cachedCompletedSends.currentPollValues.foreach(super.completedSends.add)
+        cachedDisconnected.currentPollValues.foreach { case (id, state) => super.disconnected.put(id, state) }

Review comment:
       I have moved the second line that updates current result into `update()`. They were done separately because each type uses slightly different format, but it is clearer if they are together. Added comments as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431862643



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1807,6 +1817,7 @@ class SocketServerTest {
           currentPollValues ++= newValues
         } else
           deferredValues ++= newValues
+        newValues.clear()

Review comment:
       ah, I refactored a bit to clear the original buffer in each case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634696784


   One more thing, can we improve `KafkaChannel.hashCode/equals` to avoid unnecessary work? The calls to `Objects.equals` and `Objects.hash` seem pointless.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-633934633


   @ijuma By not updating `completedReceives` when the channel is closed, we retain a reference to the channel until the next `poll()`. In terms of receives themselves, there is no impact. Looking at the code again, it seems unnecessary to use KafkaChannel as the key for the receive map. Like all other maps in Selector that store poll state for the channel, we can use the channel id instead. Updated the code.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634769086


   @ijuma Based on our discussion, I have added `Selector#clearCompletedSends()` and `Selector#clearCompletedReceives()` for SocketServer to clear buffers after they are processed. Also updated KafkaChannel. Can you review please? Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431583127



##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -804,7 +804,33 @@ private void maybeCloseOldestConnection(long currentTimeNanos) {
     }
 
     /**
-     * Clear the results from the prior poll
+     * Clears completed receives. This is used by SocketServer to remove references to
+     * receive buffers after processing completed receives, without waiting for the next
+     * poll() after all results have been processed.

Review comment:
       Nit: maybe `after all results have been processed` is a bit redundant? Same for the `clearCompletedSends` docs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431581508



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1875,6 +1889,14 @@ class SocketServerTest {
         cachedCompletedReceives.update(super.completedReceives.asScala.toBuffer)
         cachedCompletedSends.update(super.completedSends.asScala)
         cachedDisconnected.update(super.disconnected.asScala.toBuffer)
+
+        val map: util.Map[String, NetworkReceive] = JTestUtils.fieldValue(this, classOf[Selector], "completedReceives")
+        cachedCompletedReceives.currentPollValues.foreach { receive =>
+          val channelOpt = Option(super.channel(receive.source)).orElse(Option(super.closingChannel(receive.source)))
+          channelOpt.foreach { channel => map.put(channel.id, receive) }
+        }
+        cachedCompletedSends.currentPollValues.foreach(super.completedSends.add)
+        cachedDisconnected.currentPollValues.foreach { case (id, state) => super.disconnected.put(id, state) }

Review comment:
       Can we add a comment explaining what we're trying to do here? It's not clear why we do (for example):
   
   `cachedCompletedSends.update(super.completedSends.asScala)` followed by
   `cachedCompletedSends.currentPollValues.foreach(super.completedSends.add)`
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-632196447


   How about making a collection copy of ```completedReceives``` when traversing ```completedReceives```? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #8705:
URL: https://github.com/apache/kafka/pull/8705


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r431698413



##########
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##########
@@ -1807,6 +1817,7 @@ class SocketServerTest {
           currentPollValues ++= newValues
         } else
           deferredValues ++= newValues
+        newValues.clear()

Review comment:
       We return `minPerPoll` results together, so the current values are cleared and then populated as necessary. The code is now in the same place, so hopefully that is clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#issuecomment-634692630


   @rajinisivaram We also retain a reference to the `NetworkReceive`, which is probably a bigger deal, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org