You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/06 21:55:57 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r763427186



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,7 +1176,8 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available or if the position advances past control records.
+     * This method returns immediately if there are records available or if the position advances past control records
+     * or aborted transactions when isolation.level=READ_COMMITTED.

Review comment:
       nit: Worth double-checking, but I think we require "read_committed" to be lower case.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -725,17 +725,13 @@ public void onFailure(RuntimeException e) {
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
+                            position, nextPosition, completedFetch.partition, partRecords.size());
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
                     if (partRecords.isEmpty()) {
-                        log.debug(
-                                "Advanced position for partition {} without receiving any user-visible records. " 
-                                        + "This is likely due to skipping over control records in the current fetch, " 
-                                        + "and may result in the consumer returning an empty record batch when " 
-                                        + "polled before its poll timeout has elapsed.",
-                                completedFetch.partition
-                        );
+                        log.trace("Returning empty records from `poll()` " 
+                                + "since the consumer's position has advanced for at least one topic partition");

Review comment:
       nit: I think this comment made more sense in its original location in `KafkaConsumer`. At this level, it seems redundant after the changes in the log message above. We would already say "... and returning 0 records from `poll()`"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org