You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/22 07:25:54 UTC

[GitHub] [kafka] showuon opened a new pull request #9777: KAFKA-7940: wait until we've got the expected partition size

showuon opened a new pull request #9777:
URL: https://github.com/apache/kafka/pull/9777


   The test will create 99 partitions in a topic, and expect we can get the partition info after 15 seconds. If we can't get the partition info within 15 secs, we'll get the error:
   ```
   org.scalatest.exceptions.TestFailedException: Partition [group1_largeTopic,69] metadata not propagated after 15000 ms
   ```
   Obviously, 15 secs is not enough to complete the 99 partitions creation. So, fix it by explicitly wait until we've got the expected partition size before retrieving each partition info.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-764474867


   @rajinisivaram  @junrao , please help review this PR. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-749392634


   @rajinisivaram , please help review this PR. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#discussion_r567117397



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -349,10 +349,13 @@ object TestUtils extends Logging {
       !hasSessionExpirationException},
       s"Can't create topic $topic")
 
+    // wait until we've got the expected partition size
+    waitUntilMetadataIsPropagatedWithExpectedSize(servers, topic, numPartitions)

Review comment:
       As mentioned in the other comment, this is probably good enough. The extra validations below seem like overkill.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-767999171


   @omkreddy @hachikuji , could you take a look for this PR? 
   The failed test cases keep increasing with the same root cause while we try to create topic with many partitions, it'll fail that to propagate the metadata in time.
   
   https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk11/439/testReport/junit/kafka.server/MultipleListenersWithDefaultJaasContextTest/testProduceConsume__/
   https://ci-builds.apache.org/job/Kafka/job/kafka-trunk-jdk11/437/testReport/kafka.api/PlaintextConsumerTest/testMultiConsumerStickyAssignment__/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-759913781


   @rajinisivaram  @junrao , please help review this PR. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-764474867


   @rajinisivaram  @junrao , please help review this PR. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#discussion_r567113645



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -885,6 +891,24 @@ object TestUtils extends Logging {
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
 
+  /**
+   * Wait until the expected number of partitions is in the metadata cache in each broker.
+   *
+   * @param servers The list of servers that the metadata should reach to
+   * @param topic The topic name
+   * @param expectedNumPartitions The expected number of partitions
+   */
+  def waitUntilMetadataIsPropagatedWithExpectedSize(servers: Seq[KafkaServer], topic: String, expectedNumPartitions: Int): Unit = {

Review comment:
       What do you think about more concise names?
   `waitUntilMetadataIsPropagatedWithExpectedSize` -> `waitForAllPartitionMetadata`
   `waitUntilMetadataIsPropagated` -> `waitForPartitionMetadata`
   
   I wonder if this would be more useful if we return the partition metadata: `Map[TopicPartition, UpdateMetadataPartitionState`. Then we could probably skip the calls to `waitUntilMetadataIsPropagated` and `waitUntilLeaderIsElectedOrChanged` above.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -885,6 +891,24 @@ object TestUtils extends Logging {
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
 
+  /**
+   * Wait until the expected number of partitions is in the metadata cache in each broker.
+   *
+   * @param servers The list of servers that the metadata should reach to
+   * @param topic The topic name
+   * @param expectedNumPartitions The expected number of partitions
+   */
+  def waitUntilMetadataIsPropagatedWithExpectedSize(servers: Seq[KafkaServer], topic: String, expectedNumPartitions: Int): Unit = {
+    waitUntilTrue(
+      () => servers.forall { server =>
+        server.dataPlaneRequestProcessor.metadataCache.numPartitions(topic) match {

Review comment:
       Can we use `server.metadataCache`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#discussion_r567650065



##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -885,6 +891,24 @@ object TestUtils extends Logging {
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
 
+  /**
+   * Wait until the expected number of partitions is in the metadata cache in each broker.
+   *
+   * @param servers The list of servers that the metadata should reach to
+   * @param topic The topic name
+   * @param expectedNumPartitions The expected number of partitions
+   */
+  def waitUntilMetadataIsPropagatedWithExpectedSize(servers: Seq[KafkaServer], topic: String, expectedNumPartitions: Int): Unit = {
+    waitUntilTrue(
+      () => servers.forall { server =>
+        server.dataPlaneRequestProcessor.metadataCache.numPartitions(topic) match {

Review comment:
       Good suggestion. Updated.

##########
File path: core/src/test/scala/unit/kafka/utils/TestUtils.scala
##########
@@ -885,6 +891,24 @@ object TestUtils extends Logging {
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
 
+  /**
+   * Wait until the expected number of partitions is in the metadata cache in each broker.
+   *
+   * @param servers The list of servers that the metadata should reach to
+   * @param topic The topic name
+   * @param expectedNumPartitions The expected number of partitions
+   */
+  def waitUntilMetadataIsPropagatedWithExpectedSize(servers: Seq[KafkaServer], topic: String, expectedNumPartitions: Int): Unit = {

Review comment:
       Good suggestion. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-755973862


   @rajinisivaram  , please help review this PR. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#discussion_r567837343



##########
File path: core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
##########
@@ -137,12 +137,12 @@ class AddPartitionsTest extends BaseRequestTest {
     adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
 
     // read metadata from a broker and verify the new topic partitions exist
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 2)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 3)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 4)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
-    TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
+    TestUtils.waitForPartitionMetadata(servers, topic3, 1)
+    TestUtils.waitForPartitionMetadata(servers, topic3, 2)
+    TestUtils.waitForPartitionMetadata(servers, topic3, 3)
+    TestUtils.waitForPartitionMetadata(servers, topic3, 4)
+    TestUtils.waitForPartitionMetadata(servers, topic3, 5)
+    TestUtils.waitForPartitionMetadata(servers, topic3, 6)

Review comment:
       Don't replace them with `waitForAllPartitionsMetadata` because I'm afraid it'll break the original testing purposes. And same as other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9777:
URL: https://github.com/apache/kafka/pull/9777#issuecomment-770691251


   @hachikuji , thanks for the comments. I've updated:
   1. rename the method:
   `waitUntilMetadataIsPropagatedWithExpectedSize` -> `waitForAllPartitionMetadata`
   `waitUntilMetadataIsPropagated` -> `waitForPartitionMetadata`
   2. return the partition metadata for `waitForAllPartitionMetadata` and `waitForPartitionMetadata`.
   3. skip the calls to `waitUntilMetadataIsPropagated` and `waitUntilLeaderIsElectedOrChanged`, get the info from the returned partition metadata
   4. Use `server.metadataCache` directly instead of `server.dataPlaneRequestProcessor.metadataCache`
   
   Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9777: KAFKA-7940: wait until we've got the expected partition size

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9777:
URL: https://github.com/apache/kafka/pull/9777


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org