You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/14 05:01:51 UTC

[GitHub] [kafka] C0urante opened a new pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

C0urante opened a new pull request #11046:
URL: https://github.com/apache/kafka/pull/11046


   [Jira](https://issues.apache.org/jira/browse/KAFKA-12980)
   
   This is useful for reads to the end of a topic that contains aborted transactions. If an aborted transaction is at the end of the topic, the consumer can now be expected to return from `poll` if it advances past that aborted transaction, and users can query the consumer's latest `position` for the relevant topic partitions to see if it has managed to make it past the end of the topic (or rather, what was the end of the topic when the attempt to read to the end of the topic began).
   
   For a concrete example of this logic, see the [`KafkaBasedLog::readToLogEnd`](https://github.com/apache/kafka/blob/5e5d5bff3bdaf807338ec9adeac982f8a5c98fbd/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L322-L345) method that Connect employs to refresh its view of internal topics.
   
   No new unit tests are added, but many existing unit tests are modified to ensure that aborted transactions are detected and reported correctly by `Fetcher::collectFetch`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#issuecomment-937378866


   @hachikuji could you take a look at this when you have a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r758867566



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,9 +1176,10 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
-     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
-     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
+     * This method returns immediately if there are records available or if the position advances past control records.

Review comment:
       nit: past control records or aborted transactions when isolation.level=READ_COMMITTED

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -728,6 +728,15 @@ public void onFailure(RuntimeException e) {
                     log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
+                    if (partRecords.isEmpty()) {
+                        log.debug(
+                                "Advanced position for partition {} without receiving any user-visible records. " 
+                                        + "This is likely due to skipping over control records in the current fetch, " 

Review comment:
       How about this. First, we can augment the message above to something like this:
   ```scala
     log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", 
       position, nextPosition, completedFetch.partition, partRecords.size());
   ```
   This gives us enough information in the logs to see which partitions caused the `poll()` to return and it tell us exactly where in the log the aborted transaction/control records exist. Second, maybe we can add back your previous log line, but make it a little more terse and put it at trace level:
   ```scala
                           log.trace(
                                   "Returning empty records from `poll()` since the consumer's position has advanced "
                                           + "for at least one topic partition")
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r753385489



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,8 +1237,14 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        assert fetch.containsAborts();

Review comment:
       I've removed the check; I think that the cost of throwing an error to the user is a little too high since the alternative (an empty batch of records) seems less harmful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#issuecomment-974264237


   Thanks for the review @hachikuji; I agree that it's simpler and more future-proof to make the logic more general and not worry about the specific case of aborted transactions. I've updated the PR (and title) accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#issuecomment-937378866


   @hachikuji could you take a look at this when you have a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11046:
URL: https://github.com/apache/kafka/pull/11046


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#issuecomment-975659468


   Thanks @hachikuji. I've addressed the nits and tried to address the more substantial comment regarding logging; interested in your thoughts when you have a moment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r753386502



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
     /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
-     * NOTE: returning empty records guarantees the consumed position are NOT updated.
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.

Review comment:
       Thanks, this is much simpler. I've taken a stab at an approach that's agnostic with regard to the cause of the position advancing and only tracks whether any advancement has taken place; hope this looks something like what you had in mind.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r764163807



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -725,17 +725,13 @@ public void onFailure(RuntimeException e) {
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
+                            position, nextPosition, completedFetch.partition, partRecords.size());
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
                     if (partRecords.isEmpty()) {
-                        log.debug(
-                                "Advanced position for partition {} without receiving any user-visible records. " 
-                                        + "This is likely due to skipping over control records in the current fetch, " 
-                                        + "and may result in the consumer returning an empty record batch when " 
-                                        + "polled before its poll timeout has elapsed.",
-                                completedFetch.partition
-                        );
+                        log.trace("Returning empty records from `poll()` " 
+                                + "since the consumer's position has advanced for at least one topic partition");

Review comment:
       This is true. I was hoping we could have something in here that explicitly states that this can happen because of the change in behavior implemented in this PR (i.e., skipping control records or aborted transactions). If you think it's worth it to call that out in a log message we can do that here, otherwise the entire `if (partRecords.isEmpty())` branch is unnecessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r764381476



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -725,17 +725,13 @@ public void onFailure(RuntimeException e) {
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
+                            position, nextPosition, completedFetch.partition, partRecords.size());
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
                     if (partRecords.isEmpty()) {
-                        log.debug(
-                                "Advanced position for partition {} without receiving any user-visible records. " 
-                                        + "This is likely due to skipping over control records in the current fetch, " 
-                                        + "and may result in the consumer returning an empty record batch when " 
-                                        + "polled before its poll timeout has elapsed.",
-                                completedFetch.partition
-                        );
+                        log.trace("Returning empty records from `poll()` " 
+                                + "since the consumer's position has advanced for at least one topic partition");

Review comment:
       Moved it to `KafkaConsumer`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
     /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
-     * NOTE: returning empty records guarantees the consumed position are NOT updated.
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.

Review comment:
       I was considering whether it would be possible to do something a little simpler without caring as much about the case of aborted data. In the future we may have more instances of control records which would probably need similar logic. The case of an empty committed transaction mentioned in the other comment is an example of this.
   
   I think the main thing we're interested in is when the position advances. We want to ensure that we return from poll() whenever this happens. In the code, this happens here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744. Could we structure this logic so that `Fetch` is tracking all the positions which have advanced?
   
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,8 +1237,14 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        assert fetch.containsAborts();

Review comment:
       nit: we do not often use assertions. If you feel the check is worthwhile, maybe we can raise an IllegalStateException with a useful error?  
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class Fetch<K, V> {
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private boolean containsAborts;

Review comment:
       Could we clarify what this means (maybe with a better name)? From the implementation, it looks like we set this for either aborted data or markers. I also wanted to mention that it is also possible for the position to advance due to an empty committed transaction. It wouldn't be a common case, but nothing in the protocol forces a producer to write to each partition that is included in the transaction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
     /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
-     * NOTE: returning empty records guarantees the consumed position are NOT updated.
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.

Review comment:
       I was considering whether it would be possible to do something a little simpler without caring as much about the case of aborted data. In the future we may have more instances of control records which would probably need similar logic. The case of an empty committed transaction mentioned in the other comment is an example of this.
   
   I think the main thing we're interested in is when the position advances. We want to ensure that we return from poll() whenever this happens. In the code, this happens here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744. Could we structure this logic so that `Fetch` is tracking all the positions which have advanced? Then the condition for returning would only consider positions which have advanced.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r753549068



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,9 +1236,15 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
-                    // before returning the fetched records, we can send off the next round of fetches
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        log.debug(
+                                "Returning empty records from poll since the consumer's position has advanced "
+                                        + "for at least one topic partition; this may happen in the case of aborted or empty transactions"
+                        );
+                        return ConsumerRecords.empty();
+                    }                    // before returning the fetched records, we can send off the next round of fetches

Review comment:
       nit: move comment to following line

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class Fetch<K, V> {
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private boolean positionAdvanced;
+    private int numRecords;
+
+    public static <K, V> Fetch<K, V> empty() {
+        return new Fetch<>(new HashMap<>(), false, 0);
+    }
+
+    public static <K, V> Fetch<K, V> forPartition(
+            TopicPartition partition,
+            List<ConsumerRecord<K, V>> records,
+            boolean positionAdvanced
+    ) {
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = records.isEmpty()
+                ? new HashMap<>()
+                : mkMap(mkEntry(partition, records));
+        return new Fetch<>(recordsMap, positionAdvanced, records.size());
+    }
+
+    private Fetch(
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
+            boolean positionAdvanced,
+            int numRecords
+    ) {
+        this.records = records;
+        this.positionAdvanced = positionAdvanced;
+        this.numRecords = numRecords;
+    }
+
+    /**
+     * Add another {@link Fetch} to this one; all of its records will be added to this fetch's
+     * {@link #records()} records}, and if the other fetch
+     * {@link #positionAdvanced() advanced the consume position for any topic partition},
+     * this fetch will be marked as having advanced the consume position as well.
+     * @param fetch the other fetch to add; may not be null
+     */
+    public void add(Fetch<K, V> fetch) {
+        Objects.requireNonNull(fetch);
+        addRecords(fetch.records);
+        this.positionAdvanced |= fetch.positionAdvanced;
+    }
+
+    /**
+     * @return all of the non-control messages for this fetch, grouped by partition
+     */
+    public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() {
+        return Collections.unmodifiableMap(records);
+    }
+
+    /**
+     * @return whether the fetch caused the consumer's
+     * {@link org.apache.kafka.clients.consumer.KafkaConsumer#position(TopicPartition) position} to advance for at
+     * least one of the topic partitions in this fetch
+     */
+    public boolean positionAdvanced() {
+        return positionAdvanced;
+    }
+
+    /**
+     * @return the total number of non-control messages for this fetch, across all partitions
+     */
+    public int numRecords() {
+        return numRecords;
+    }
+
+    /**
+     * @return {@code true} if and only if this fetch did not return any user-visible (i.e., non-control) records, and
+     * did not cause the consumer position to advance for any topic partitions
+     */
+    public boolean isEmpty() {
+        return numRecords == 0 && !positionAdvanced;
+    }
+
+    private void addRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
+        records.forEach((partition, partRecords) -> {
+            this.numRecords += partRecords.size();
+            List<ConsumerRecord<K, V>> currentRecords = this.records.get(partition);
+            if (currentRecords == null) {
+                this.records.put(partition, partRecords);
+            } else {
+                // this case shouldn't usually happen because we only send one fetch at a time per partition,
+                // but it might conceivably happen in some rare cases (such as partition leader changes).
+                // we have to copy to a new list because the old one may be immutable
+                List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(currentRecords.size() + partRecords.size());
+                newRecords.addAll(currentRecords);
+                newRecords.addAll(partRecords);
+                this.records.put(partition, newRecords);
+            }
+        });
+    }
+}

Review comment:
       nit: newline

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,9 +1236,15 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
-                    // before returning the fetched records, we can send off the next round of fetches
+                final Fetch<K, V> fetch = pollForFetches(timer);

Review comment:
       Can we update the documentation for `poll()` to mention that it will return when the position advances due to aborted transactions or control records?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,9 +1236,15 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
-                    // before returning the fetched records, we can send off the next round of fetches
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        log.debug(

Review comment:
       I'm not so sure about the value of this log line. Maybe it would be more useful to ensure that we have enough logging in `Fetcher` that we can see when the position gets advanced without any record data? That would tell us not only why a poll() returned with empty data, but which partition caused it to do so. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r754402147



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,9 +1236,15 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
-                    // before returning the fetched records, we can send off the next round of fetches
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        log.debug(

Review comment:
       I think that works. My goal with this line was to help people understand the cause for this new behavior, as it may seem like a bug at first if `poll` returns empty batches before the poll timeout has elapsed. I've taken a stab at a message that has functional value (i.e., naming the specific topic partition whose position has advanced without any user-visible records) and still tries to let the user know about how/why this may affect `poll` behavior; LMKWYT.
   
   Also, this section was pretty buggy as-was since it skipped the call to `client.transmitSends` and didn't pass the empty batch through the consumer's interceptors. I've fixed both of these; LMK if you think we should skip passing an empty batch to interceptors, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r764255533



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -725,17 +725,13 @@ public void onFailure(RuntimeException e) {
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
+                            position, nextPosition, completedFetch.partition, partRecords.size());
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
                     if (partRecords.isEmpty()) {
-                        log.debug(
-                                "Advanced position for partition {} without receiving any user-visible records. " 
-                                        + "This is likely due to skipping over control records in the current fetch, " 
-                                        + "and may result in the consumer returning an empty record batch when " 
-                                        + "polled before its poll timeout has elapsed.",
-                                completedFetch.partition
-                        );
+                        log.trace("Returning empty records from `poll()` " 
+                                + "since the consumer's position has advanced for at least one topic partition");

Review comment:
       I'm inclined to either remove the log line entirely or move it back to its former location in `KafkaConsumer`. Will leave it up to you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r764161599



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,7 +1176,8 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available or if the position advances past control records.
+     * This method returns immediately if there are records available or if the position advances past control records
+     * or aborted transactions when isolation.level=READ_COMMITTED.

Review comment:
       Sure, https://github.com/apache/kafka/blob/a2da309f134ec2930f70f1a3917eaade7548132f/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L557.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r762182442



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -728,6 +728,15 @@ public void onFailure(RuntimeException e) {
                     log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
+                    if (partRecords.isEmpty()) {
+                        log.debug(
+                                "Advanced position for partition {} without receiving any user-visible records. " 
+                                        + "This is likely due to skipping over control records in the current fetch, " 

Review comment:
       Sounds good to me 👍 

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,9 +1176,10 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
-     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
-     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
+     * This method returns immediately if there are records available or if the position advances past control records.

Review comment:
       Ack, done.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,9 +1176,10 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
-     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
-     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
+     * This method returns immediately if there are records available or if the position advances past control records.

Review comment:
       Ack, will add.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
     /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
-     * NOTE: returning empty records guarantees the consumed position are NOT updated.
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.

Review comment:
       I was considering whether it would be possible to do something a little simpler without caring as much about the case of aborted data. In the future we may have more instances of control records which would probably need similar logic. The case of an empty committed transaction mentioned in the other comment is an example of this.
   
   I think the main thing we're interested in is when the position advances (regardless of the reason). We want to ensure that we return from poll() whenever this happens. In the code, this happens here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744. Could we structure this logic so that `Fetch` is tracking all the positions which have advanced? Then the condition for returning would only consider positions which have advanced.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r753385918



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class Fetch<K, V> {
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private boolean containsAborts;

Review comment:
       Based on your other comment I've altered this to `positionAdvanced`; LMKWYT.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r763427186



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,7 +1176,8 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available or if the position advances past control records.
+     * This method returns immediately if there are records available or if the position advances past control records
+     * or aborted transactions when isolation.level=READ_COMMITTED.

Review comment:
       nit: Worth double-checking, but I think we require "read_committed" to be lower case.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -725,17 +725,13 @@ public void onFailure(RuntimeException e) {
                             completedFetch.nextFetchOffset,
                             completedFetch.lastEpoch,
                             position.currentLeader);
-                    log.trace("Update fetching position to {} for partition {}", nextPosition, completedFetch.partition);
+                    log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`",
+                            position, nextPosition, completedFetch.partition, partRecords.size());
                     subscriptions.position(completedFetch.partition, nextPosition);
                     positionAdvanced = true;
                     if (partRecords.isEmpty()) {
-                        log.debug(
-                                "Advanced position for partition {} without receiving any user-visible records. " 
-                                        + "This is likely due to skipping over control records in the current fetch, " 
-                                        + "and may result in the consumer returning an empty record batch when " 
-                                        + "polled before its poll timeout has elapsed.",
-                                completedFetch.partition
-                        );
+                        log.trace("Returning empty records from `poll()` " 
+                                + "since the consumer's position has advanced for at least one topic partition");

Review comment:
       nit: I think this comment made more sense in its original location in `KafkaConsumer`. At this level, it seems redundant after the changes in the log message above. We would already say "... and returning 0 records from `poll()`"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r764161918



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1176,7 +1176,8 @@ public void assign(Collection<TopicPartition> partitions) {
      * offset for the subscribed list of partitions
      *
      * <p>
-     * This method returns immediately if there are records available or if the position advances past control records.
+     * This method returns immediately if there are records available or if the position advances past control records
+     * or aborted transactions when isolation.level=READ_COMMITTED.

Review comment:
       ```suggestion
        * or aborted transactions when isolation.level=read_committed.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org