You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/18 22:46:13 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

hachikuji commented on a change in pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#discussion_r752683020



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -638,15 +636,15 @@ public void onFailure(RuntimeException e) {
     /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
-     * NOTE: returning empty records guarantees the consumed position are NOT updated.
+     * NOTE: returning an {@link Fetch#isEmpty empty} fetch guarantees the consumed position is not updated.

Review comment:
       I was considering whether it would be possible to do something a little simpler without caring as much about the case of aborted data. In the future we may have more instances of control records which would probably need similar logic. The case of an empty committed transaction mentioned in the other comment is an example of this.
   
   I think the main thing we're interested in is when the position advances. We want to ensure that we return from poll() whenever this happens. In the code, this happens here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L744. Could we structure this logic so that `Fetch` is tracking all the positions which have advanced?
   
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##########
@@ -1235,8 +1237,14 @@ public void assign(Collection<TopicPartition> partitions) {
                     }
                 }
 
-                final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
-                if (!records.isEmpty()) {
+                final Fetch<K, V> fetch = pollForFetches(timer);
+                if (!fetch.isEmpty()) {
+                    if (fetch.records().isEmpty()) {
+                        assert fetch.containsAborts();

Review comment:
       nit: we do not often use assertions. If you feel the check is worthwhile, maybe we can raise an IllegalStateException with a useful error?  
   

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public class Fetch<K, V> {
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private boolean containsAborts;

Review comment:
       Could we clarify what this means (maybe with a better name)? From the implementation, it looks like we set this for either aborted data or markers. I also wanted to mention that it is also possible for the position to advance due to an empty committed transaction. It wouldn't be a common case, but nothing in the protocol forces a producer to write to each partition that is included in the transaction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org