You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/23 04:58:47 UTC

[GitHub] [spark] HeartSaVioR edited a comment on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

HeartSaVioR edited a comment on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-514048478
 
 
   Here's a part of test code Kafka has been doing with new poll.
   
   https://github.com/apache/kafka/blob/f98e176746d663fadedbcd3c18312a7f476a20c8/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala#L1748-L1752
   
   ```
     private def awaitAssignment(consumer: Consumer[_, _], expectedAssignment: Set[TopicPartition]): Unit = {
       TestUtils.pollUntilTrue(consumer, () => consumer.assignment() == expectedAssignment.asJava,
         s"Timed out while awaiting expected assignment $expectedAssignment. " +
           s"The current assignment is ${consumer.assignment()}")
     }
   ```
   
   https://github.com/apache/kafka/blob/f98e176746d663fadedbcd3c18312a7f476a20c8/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L767-L775
   
   ```
     def pollUntilTrue(consumer: Consumer[_, _],
                       action: () => Boolean,
                       msg: => String,
                       waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
       waitUntilTrue(() => {
         consumer.poll(Duration.ofMillis(50))
         action()
       }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
     }
   ```
   
   Kafka has still some parts of test code relying on deprecated `poll(0)` (so co-usage on both `poll(Duration)` and `poll(long)`). It might not be technical reason to do so, but they're still relying on old favor, which might mean they indicate the needs of usage on `poll(0)`.
   
   Sometimes Kafka calls `updateAssignmentMetadataIfNeeded` directly which deals with metadata update in `poll()` with max long timer, effectively blocking. The method is for testing: defined as package private.
   
   ```
   consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
   ```
   
   In many cases of calling `poll(Duration.ZERO)` in test code, `updateAssignmentMetadataIfNeeded` is called prior. In other cases the verification codes just seem to confirm calling poll doing nothing or returning already fetched records.
   
   I guess in our case we need to either leverage `updateAssignmentMetadataIfNeeded` to only deal with metadata (it may require some hack and they clarified it's for testing so this is not the one for us), or `poll` with small timeout (50ms) with tolerating the case where record to pull is not available (incorporated in latency regardless of availability of metadata).
   
   Btw, I'm seeing KIP-288 which proposed new public API `waitForAssignment` similar to `updateAssignmentMetadataIfNeeded` but it was discarded since KIP-266 superseded KIP-288, and KIP-266 didn't finally add it. Not sure it is declined or just missed it.
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-288%3A+%5BDISCARDED%5D+Consumer.poll%28%29+timeout+semantic+change+and+new+waitForAssignment+method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org