You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/02 13:54:03 UTC

[GitHub] [kafka] rajinisivaram opened a new pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

rajinisivaram opened a new pull request #10022:
URL: https://github.com/apache/kafka/pull/10022


   Changes from https://issues.apache.org/jira/browse/KAFKA-10866 cause early return from KafkaConsumer.poll() even when records are not available. We should respect timeout specified in poll() and return only when records are available. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569581763



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       Thanks, all. I agree with the conclusion that the system is operating as designed, so we should close this PR and either change the design or the system tests.
   
   I also think @rajinisivaram 's suggestion to discuss it on the mailing list is a good one, so that the discussion will be part of the history of the KIP and not lost to the sands of time in this thread.
   
   I'll send a new response to the vote thread to re-initialize that discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569062203



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       I made this change so that we would immediately send off the next round of fetches after handling _any_ fetch responses, not just any fetch responses that contain records.
   
   Was my reasoning incorrect?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569196050



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       > At the moment, I think we could change the javadoc along with the PR if so far we've only seen our tests being broken because it relies on this guarantee; if you have any common use cases that may be impacted by this behavior change, I'd love to hear and revisit.
   
   just imagine a use case :)
   
   Users tried to random access a record according to offset. Hence, the consumer is NOT in a loop. Instead, it works like a getter method.
   
   ```scala
   def randomAccess(offset: Int, duration: Duration): Seq[ConsumerRecords] = {
     consumer.seek(tp, offset)
     consumer.poll(duration).asJava
   }
   ```
   
   In this case, users expect the method returns empty record "only if" there is no available records. With the new behavior, users possibly receive empty results before timeout and they have to rewrite the code like below.
   
   ```scala
   def randomAccess(offset: Int, duration: Duration): Seq[ConsumerRecords] = {
     consumer.seek(tp, offset)
     val endTime = System.currentTimeMillis() + duration.toMillis
     while (System.currentTimeMillis() <= endTime) {
       val records = consumer.poll(duration).asJava
       if (!records.isEmpty) return records
     }
     Seq.empty
   }
   ```
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#issuecomment-771813678


   @chia7712 Thanks for the review. There is a test that verifies that we get metadata even when there are no records (https://github.com/apache/kafka/blob/5c562efb2d76407011ea88c1ca1b2355079935bc/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2158). So I assumed that we wanted to populate this for some reason. I wasn't sure if we also rely on the consumer returning early to use the metadata, so will wait for @vvcephei to take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569174212



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       I had thought about that with @vvcephei on the original PR that changes this behavior:
   
   https://github.com/apache/kafka/pull/9836#issuecomment-765592268
   
   And we decided this is not going to impact users much in practice and hence still went ahead with it since by the end of the day, we would still condition on `ConsumerRecords#isEmpty` or the for loop to process records if there are any; but I admit that I did not catch the javadoc statement back then.
   
   At the moment, I think we could change the javadoc along with the PR if so far we've only seen our tests being broken because it relies on this guarantee; if you have any common use cases that may be impacted by this behavior change, I'd love to hear and revisit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram closed pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
rajinisivaram closed pull request #10022:
URL: https://github.com/apache/kafka/pull/10022


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569292052



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       Fixing system tests would be straightforward, but as @chia7712 said, this could impact existing applications which rely on the current behaviour. For example, applications may `poll(longTimeout)` and exit if no records are returned, treating it as an error condition, similar to system tests. Adding a new API with options sounds like the safe alternative to maintain compatibility. But it may be good to follow up on the mailing list, perhaps on the discussion thread  of KIP-695 to get wider opinion. Anyway the change proposed in this PR seems unsuitable, we can either fix system tests or make the new behaviour optional.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#issuecomment-772748095


   @vvcephei Thank you! I will close this PR since the discussion can continue on the mailing list.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#issuecomment-771781849


   How about making ```Fetcher#fetchedRecords``` update metadata only if records is not empty? In other words, the [Fetcher.java#L643](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L643) can be moved to [Fetcher.java#L659](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L659). The ```FetchedRecords``` having no records always gets discarded. Hence, we don't need to set metadata of ```FetchedRecords``` when records is empty.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#issuecomment-771813678


   @chia7712 Thanks for the review. There is a test that verifies that we get metadata even when there are no records (https://github.com/apache/kafka/blob/5c562efb2d76407011ea88c1ca1b2355079935bc/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L2158). So I assumed that we wanted to populate this for some reason. I wasn't sure if we also rely on the consumer returning early to use the metadata, so will wait for @vvcephei to take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569105085



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       not sure whether this is a kind of behavior change. The docs of ```KafkaConsumer#poll``` indicates that ```timeout``` is used to wait available records.
   
   ```
        * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
        * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
        * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
   ```
   
   Maybe we can introduce a new API ```poll(Duration, Options)``` (similar to KafkaAdmin. The options enables us to adjust the poll behavior for specific use cases. Also, it opens a room to give various ```poll``` in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#issuecomment-771781849


   How about making ```Fetcher#fetchedRecords``` update metadata only if records is not empty? In other words, the [Fetcher.java#L643](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L643) can be moved to [Fetcher.java#L659](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L659). The ```FetchedRecords``` having no records always gets discarded. Hence, we don't need to set metadata of ```FetchedRecords``` when records is empty.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #10022: KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #10022:
URL: https://github.com/apache/kafka/pull/10022#discussion_r569635435



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1236,7 +1236,7 @@ public void assign(Collection<TopicPartition> partitions) {
                 }
 
                 final FetchedRecords<K, V> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                if (!records.records().isEmpty()) {

Review comment:
       Ok, @chia7712 and @rajinisivaram , I've restarted the VOTE thread with a new message.
   
   Hopefully, we can wrap up that discussion quickly, so I can circle back to either change the feature or the tests.
   
   Thanks for that counter-example, @chia7712 ! Actually, we were aware of that kind of case, and your proposed workaround is exactly what we had to do in the integration tests: https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 :)
   
   The key question, which I tried to pose in the mailing list, is whether this is really a "real" use case we have to support, or whether it's just something we happen to do in some tests or are able to imagine. We can certainly add a new method to the interface, but that also has nontrivial usability costs, as users need to understand the differences of those two methods and we also have to maintain and test both code paths. If it's not that likely that someone outside of our own project will be harmed, it seems better to just make the change in place.
   
   Anyway, we should discuss on the mailing list; I just wanted to acknowledge your response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org