You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/03 19:13:17 UTC

[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r762182442



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -728,6 +728,15 @@ public void onFailure(RuntimeException e) {
                     log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
+                    if (partRecords.isEmpty()) {
+                        log.debug(
+                                "Advanced position for partition {} without receiving any user-visible records. " 
+                                        + "This is likely due to skipping over control records in the current fetch, " 

Review comment:
       Sounds good to me 👍 

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,9 +1176,10 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
-     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
-     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
+     * This method returns immediately if there are records available or if the position advances past control records.

Review comment:
       Ack, done.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,9 +1176,10 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
-     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
-     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
+     * This method returns immediately if there are records available or if the position advances past control records.

Review comment:
       Ack, will add.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org