You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/02 07:04:35 UTC

[GitHub] [kafka] dengziming opened a new pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

dengziming opened a new pull request #10243:
URL: https://github.com/apache/kafka/pull/10243


   *More detailed description of your change*
   The test fails some times as follow:
   ![Pasted Graphic](https://user-images.githubusercontent.com/26023240/109610770-286d5080-7b68-11eb-92a5-ab6b45b79c2f.png)
   
   We'd better use `TestUtils.waitUntilTrue` instead of waiting for 1 second because sometimes 1 second is too long and sometimes is too short.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#issuecomment-797842164


   > Thank for the update. Have you tried to repeatedly run the test to verify that it resolve the issue?
   
   @dajac Of course, I tried many times to verify, only when I set the waitTimeMs<6000 it fails occasionally. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #10243:
URL: https://github.com/apache/kafka/pull/10243


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r593505670



##########
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##########
@@ -476,14 +476,18 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     // New instance of consumer should be assigned partitions immediately and should see committed offsets.
     val assignSemaphore = new Semaphore(0)
     val consumer = createConsumerWithGroupId(groupId)
-    consumer.subscribe(Collections.singletonList(topic),  new ConsumerRebalanceListener {
+    consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
         assignSemaphore.release()
       }
       def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       }})
-    consumer.poll(time.Duration.ofSeconds(3L))
-    assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time")
+
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(time.Duration.ZERO)

Review comment:
       Should we keep a small poll timeout here? Something like 100ms?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r587026530



##########
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##########
@@ -483,7 +483,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
       def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       }})
     consumer.poll(time.Duration.ofSeconds(3L))
-    assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time")
+    TestUtils.waitUntilTrue(() => assignSemaphore.tryAcquire(100, TimeUnit.MILLISECONDS),
+      "Assignment did not complete on time")

Review comment:
       Sorry for the late reply, the `onPartitionsAssigned` is only called in `AbstractCoordinator.onJoinComplete`, we should call poll to invoke join `onJoinComplete`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r585387234



##########
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##########
@@ -483,7 +483,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
       def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       }})
     consumer.poll(time.Duration.ofSeconds(3L))
-    assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time")
+    TestUtils.waitUntilTrue(() => assignSemaphore.tryAcquire(100, TimeUnit.MILLISECONDS),
+      "Assignment did not complete on time")

Review comment:
       Could the `ConsumerRebalanceListener` be called if we don't poll? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r587236489



##########
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##########
@@ -483,7 +483,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
       def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       }})
     consumer.poll(time.Duration.ofSeconds(3L))
-    assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time")
+    TestUtils.waitUntilTrue(() => assignSemaphore.tryAcquire(100, TimeUnit.MILLISECONDS),
+      "Assignment did not complete on time")

Review comment:
       That make sense, I tried to print the time cost by `consumer.poll`, 
   ```
       println("start:" + System.currentTimeMillis() )
       consumer.poll(time.Duration.ofSeconds(3L))
       println("end:" + System.currentTimeMillis() )
   ```
   
   and I found that `poll` will cost 3 seconds since no data is consumed before timeout.
   ```
   start:1614844034207
   end:1614844037213
   ```
   
   We can save almost 3 seconds after moving `poll` to `waitUntilTrue` .
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r587215193



##########
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##########
@@ -483,7 +483,8 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
       def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       }})
     consumer.poll(time.Duration.ofSeconds(3L))
-    assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time")
+    TestUtils.waitUntilTrue(() => assignSemaphore.tryAcquire(100, TimeUnit.MILLISECONDS),
+      "Assignment did not complete on time")

Review comment:
       Make sense. So, if `assignSemaphore` is not released after `poll` is executed, it will never be released, right? Should we include `poll` within `waitUntilTrue`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10243: KAFKA-12398: Fix flaky test `ConsumerBounceTest.testClose`

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10243:
URL: https://github.com/apache/kafka/pull/10243#discussion_r593536795



##########
File path: core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
##########
@@ -476,14 +476,18 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging {
     // New instance of consumer should be assigned partitions immediately and should see committed offsets.
     val assignSemaphore = new Semaphore(0)
     val consumer = createConsumerWithGroupId(groupId)
-    consumer.subscribe(Collections.singletonList(topic),  new ConsumerRebalanceListener {
+    consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener {
       def onPartitionsAssigned(partitions: Collection[TopicPartition]): Unit = {
         assignSemaphore.release()
       }
       def onPartitionsRevoked(partitions: Collection[TopicPartition]): Unit = {
       }})
-    consumer.poll(time.Duration.ofSeconds(3L))
-    assertTrue(assignSemaphore.tryAcquire(1, TimeUnit.SECONDS), "Assignment did not complete on time")
+
+    TestUtils.waitUntilTrue(() => {
+      consumer.poll(time.Duration.ZERO)

Review comment:
       Done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org