You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/08/02 00:10:44 UTC

[kafka] branch trunk updated: KAFKA-7548; KafkaConsumer should not discard fetched data for paused partitions (#6988)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1546fc3  KAFKA-7548; KafkaConsumer should not discard fetched data for paused partitions (#6988)
1546fc3 is described below

commit 1546fc30af520f8d133aa3454cdc8a036bae0f3e
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Thu Aug 1 20:10:19 2019 -0400

    KAFKA-7548; KafkaConsumer should not discard fetched data for paused partitions (#6988)
    
    This is an updated implementation of #5844 by @MayureshGharat (with Mayuresh's permission). As described in the original ticket:
    
    > Today when we call KafkaConsumer.poll(), it will fetch data from Kafka asynchronously and is put in to a local buffer (completedFetches).
    >
    > If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might throw away any buffered data that we might have in the local buffer for these TopicPartitions. Generally, if an application is calling pause on some TopicPartitions, it is likely to resume those TopicPartitions in near future, which would require KafkaConsumer to re-issue a fetch for the same data that it had buffered earlier for these TopicPartitions. This is a wasted effort from the application's point of view.
    
    This patch fixes the problem by retaining the paused data in the completed fetches queue, essentially moving it to the back on each call to `fetchedRecords`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |   2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 120 ++++++++++-----
 .../clients/consumer/internals/FetcherTest.java    | 165 +++++++++++++++++++++
 3 files changed, 253 insertions(+), 34 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index b33b1f2..0d0efe1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1270,7 +1270,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         client.poll(pollTimer, () -> {
             // since a fetch might be completed by the background thread, we need this poll condition
             // to ensure that we do not block unnecessarily in poll()
-            return !fetcher.hasCompletedFetches();
+            return !fetcher.hasAvailableFetches();
         });
         timer.update(pollTimer.currentTimeMs());
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index d4d028d..c1c42bd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -50,8 +50,8 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Min;
-import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.BufferSupplier;
@@ -81,6 +81,7 @@ import org.slf4j.helpers.MessageFormatter;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -93,6 +94,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -140,7 +142,7 @@ public class Fetcher<K, V> implements Closeable {
     private final ConsumerMetadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
-    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final ConcurrentLinkedQueue<PartitionRecords> completedFetches;
     private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -216,14 +218,23 @@ public class Fetcher<K, V> implements Closeable {
     }
 
     /**
-     * Return whether we have any completed fetches pending return to the user. This method is thread-safe.
+     * Return whether we have any completed fetches pending return to the user. This method is thread-safe. Has
+     * visibility for testing.
      * @return true if there are completed fetches, false otherwise
      */
-    public boolean hasCompletedFetches() {
+    protected boolean hasCompletedFetches() {
         return !completedFetches.isEmpty();
     }
 
     /**
+     * Return whether we have any completed fetches that are fetchable. This method is thread-safe.
+     * @return true if there are completed fetches that can be returned, false otherwise
+     */
+    public boolean hasAvailableFetches() {
+        return completedFetches.stream().anyMatch(fetch -> subscriptions.isFetchable(fetch.partition));
+    }
+
+    /**
      * Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
      * an in-flight fetch or pending fetch data.
      * @return number of fetches sent
@@ -291,8 +302,11 @@ public class Fetcher<K, V> implements Closeable {
 
                                             log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                                     isolationLevel, fetchOffset, partition, fetchData);
-                                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
-                                                    resp.requestHeader().apiVersion()));
+
+                                            CompletedFetch completedFetch = new CompletedFetch(partition, fetchOffset,
+                                                    fetchData, metricAggregator, resp.requestHeader().apiVersion());
+
+                                            completedFetches.add(parseCompletedFetch(completedFetch));
                                         }
                                     }
 
@@ -577,33 +591,45 @@ public class Fetcher<K, V> implements Closeable {
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
         Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
+        Queue<PartitionRecords> pausedCompletedFetches = new ArrayDeque<>();
         int recordsRemaining = maxPollRecords;
 
         try {
             while (recordsRemaining > 0) {
                 if (nextInLineRecords == null || nextInLineRecords.isFetched) {
-                    CompletedFetch completedFetch = completedFetches.peek();
-                    if (completedFetch == null) break;
-
-                    try {
-                        nextInLineRecords = parseCompletedFetch(completedFetch);
-                    } catch (Exception e) {
-                        // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
-                        // (2) there are no fetched records with actual content preceding this exception.
-                        // The first condition ensures that the completedFetches is not stuck with the same completedFetch
-                        // in cases such as the TopicAuthorizationException, and the second condition ensures that no
-                        // potential data loss due to an exception in a following record.
-                        FetchResponse.PartitionData partition = completedFetch.partitionData;
-                        if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
-                            completedFetches.poll();
+                    PartitionRecords records = completedFetches.peek();
+                    if (records == null) break;
+
+                    if (records.notInitialized()) {
+                        try {
+                            nextInLineRecords = initializePartitionRecords(records);
+                        } catch (Exception e) {
+                            // Remove a completedFetch upon a parse with exception if (1) it contains no records, and
+                            // (2) there are no fetched records with actual content preceding this exception.
+                            // The first condition ensures that the completedFetches is not stuck with the same completedFetch
+                            // in cases such as the TopicAuthorizationException, and the second condition ensures that no
+                            // potential data loss due to an exception in a following record.
+                            FetchResponse.PartitionData partition = records.completedFetch.partitionData;
+                            if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) {
+                                completedFetches.poll();
+                            }
+                            throw e;
                         }
-                        throw e;
+                    } else {
+                        nextInLineRecords = records;
                     }
                     completedFetches.poll();
+                } else if (subscriptions.isPaused(nextInLineRecords.partition)) {
+                    // when the partition is paused we add the records back to the completedFetches queue instead of draining
+                    // them so that they can be returned on a subsequent poll if the partition is resumed at that time
+                    log.debug("Skipping fetching records for assigned partition {} because it is paused", nextInLineRecords.partition);
+                    pausedCompletedFetches.add(nextInLineRecords);
+                    nextInLineRecords = null;
                 } else {
                     List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
-                    TopicPartition partition = nextInLineRecords.partition;
+
                     if (!records.isEmpty()) {
+                        TopicPartition partition = nextInLineRecords.partition;
                         List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
                         if (currentRecords == null) {
                             fetched.put(partition, records);
@@ -623,7 +649,12 @@ public class Fetcher<K, V> implements Closeable {
         } catch (KafkaException e) {
             if (fetched.isEmpty())
                 throw e;
+        } finally {
+            // add any polled completed fetches for paused partitions back to the completed fetches queue to be
+            // re-evaluated in the next poll
+            completedFetches.addAll(pausedCompletedFetches);
         }
+
         return fetched;
     }
 
@@ -670,7 +701,9 @@ public class Fetcher<K, V> implements Closeable {
             }
         }
 
+        log.trace("Draining fetched records for partition {}", partitionRecords.partition);
         partitionRecords.drain();
+
         return emptyList();
     }
 
@@ -1025,7 +1058,7 @@ public class Fetcher<K, V> implements Closeable {
         if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
             exclude.add(nextInLineRecords.partition);
         }
-        for (CompletedFetch completedFetch : completedFetches) {
+        for (PartitionRecords completedFetch : completedFetches) {
             exclude.add(completedFetch.partition);
         }
         return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
@@ -1125,20 +1158,31 @@ public class Fetcher<K, V> implements Closeable {
     }
 
     /**
-     * The callback for fetch completion
+     * Parse a PartitionRecords object from a CompletedFetch
      */
     private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
         TopicPartition tp = completedFetch.partition;
         FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;
+
+        Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
+        return new PartitionRecords(tp, completedFetch, batches);
+    }
+
+    /**
+     * Initialize a PartitionRecords object.
+     */
+    private PartitionRecords initializePartitionRecords(PartitionRecords partitionRecordsToInitialize) {
+        CompletedFetch completedFetch = partitionRecordsToInitialize.completedFetch;
+        TopicPartition tp = completedFetch.partition;
+        FetchResponse.PartitionData<Records> partition = completedFetch.partitionData;
         long fetchOffset = completedFetch.fetchedOffset;
         PartitionRecords partitionRecords = null;
         Errors error = partition.error;
 
         try {
-            if (!subscriptions.isFetchable(tp)) {
-                // this can happen when a rebalance happened or a partition consumption paused
-                // while fetch is still in-flight
-                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
+            if (!subscriptions.hasValidPosition(tp)) {
+                // this can happen when a rebalance happened while fetch is still in-flight
+                log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
             } else if (error == Errors.NONE) {
                 // we are interested in this fetch only if the beginning offset matches the
                 // current consumed position
@@ -1152,7 +1196,7 @@ public class Fetcher<K, V> implements Closeable {
                 log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
                         partition.records.sizeInBytes(), tp, position);
                 Iterator<? extends RecordBatch> batches = partition.records.batches().iterator();
-                partitionRecords = new PartitionRecords(tp, completedFetch, batches);
+                partitionRecords = partitionRecordsToInitialize;
 
                 if (!batches.hasNext() && partition.records.sizeInBytes() > 0) {
                     if (completedFetch.responseVersion < 3) {
@@ -1196,6 +1240,8 @@ public class Fetcher<K, V> implements Closeable {
                     });
                 }
 
+
+                partitionRecordsToInitialize.initialized = true;
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
@@ -1286,13 +1332,16 @@ public class Fetcher<K, V> implements Closeable {
      * @param assignedPartitions  newly assigned {@link TopicPartition}
      */
     public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
-        Iterator<CompletedFetch> itr = completedFetches.iterator();
-        while (itr.hasNext()) {
-            TopicPartition tp = itr.next().partition;
+        Iterator<PartitionRecords> completedFetchesItr = completedFetches.iterator();
+        while (completedFetchesItr.hasNext()) {
+            PartitionRecords records = completedFetchesItr.next();
+            TopicPartition tp = records.partition;
             if (!assignedPartitions.contains(tp)) {
-                itr.remove();
+                records.drain();
+                completedFetchesItr.remove();
             }
         }
+
         if (nextInLineRecords != null && !assignedPartitions.contains(nextInLineRecords.partition)) {
             nextInLineRecords.drain();
             nextInLineRecords = null;
@@ -1345,6 +1394,7 @@ public class Fetcher<K, V> implements Closeable {
         private boolean isFetched = false;
         private Exception cachedRecordException = null;
         private boolean corruptLastRecord = false;
+        private boolean initialized = false;
 
         private PartitionRecords(TopicPartition partition,
                                  CompletedFetch completedFetch,
@@ -1546,6 +1596,10 @@ public class Fetcher<K, V> implements Closeable {
             Record firstRecord = batchIterator.next();
             return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
         }
+
+        private boolean notInitialized() {
+            return !this.initialized;
+        }
     }
 
     private static class CompletedFetch {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 44c00c4..5f66689 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -898,6 +898,171 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetchOnCompletedFetchesForPausedAndResumedPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, fetcher.sendFetches());
+
+        subscriptions.pause(tp0);
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchedRecords();
+        assertEquals("Should not return any records when partition is paused", 0, fetchedRecords.size());
+        assertTrue("Should still contain completed fetches", fetcher.hasCompletedFetches());
+        assertFalse("Should not have any available (non-paused) completed fetches", fetcher.hasAvailableFetches());
+        assertNull(fetchedRecords.get(tp0));
+        assertEquals(0, fetcher.sendFetches());
+
+        subscriptions.resume(tp0);
+
+        assertTrue("Should have available (non-paused) completed fetches", fetcher.hasAvailableFetches());
+
+        consumerClient.poll(time.timer(0));
+        fetchedRecords = fetchedRecords();
+        assertEquals("Should return records when partition is resumed", 1, fetchedRecords.size());
+        assertNotNull(fetchedRecords.get(tp0));
+        assertEquals(3, fetchedRecords.get(tp0).size());
+
+        consumerClient.poll(time.timer(0));
+        fetchedRecords = fetchedRecords();
+        assertEquals("Should not return records after previously paused partitions are fetched", 0, fetchedRecords.size());
+        assertFalse("Should no longer contain completed fetches", fetcher.hasCompletedFetches());
+    }
+
+    @Test
+    public void testFetchOnCompletedFetchesForSomePausedPartitions() {
+        buildFetcher();
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords;
+
+        assignFromUser(Utils.mkSet(tp0, tp1));
+
+        // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses
+
+        // #1 seek, request, poll, response
+        subscriptions.seek(tp0, 1);
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+
+        // #2 seek, request, poll, response
+        subscriptions.seek(tp1, 1);
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));
+
+        subscriptions.pause(tp0);
+        consumerClient.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+        assertEquals("Should return completed fetch for unpaused partitions", 1, fetchedRecords.size());
+        assertTrue("Should still contain completed fetches", fetcher.hasCompletedFetches());
+        assertNotNull(fetchedRecords.get(tp1));
+        assertNull(fetchedRecords.get(tp0));
+
+        fetchedRecords = fetchedRecords();
+        assertEquals("Should return no records for remaining paused partition", 0, fetchedRecords.size());
+        assertTrue("Should still contain completed fetches", fetcher.hasCompletedFetches());
+    }
+
+    @Test
+    public void testFetchOnCompletedFetchesForAllPausedPartitions() {
+        buildFetcher();
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords;
+
+        assignFromUser(Utils.mkSet(tp0, tp1));
+
+        // seek to tp0 and tp1 in two polls to generate 2 complete requests and responses
+
+        // #1 seek, request, poll, response
+        subscriptions.seek(tp0, 1);
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+
+        // #2 seek, request, poll, response
+        subscriptions.seek(tp1, 1);
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0));
+
+        subscriptions.pause(tp0);
+        subscriptions.pause(tp1);
+
+        consumerClient.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+        assertEquals("Should return no records for all paused partitions", 0, fetchedRecords.size());
+        assertTrue("Should still contain completed fetches", fetcher.hasCompletedFetches());
+        assertFalse("Should not have any available (non-paused) completed fetches", fetcher.hasAvailableFetches());
+    }
+
+    @Test
+    public void testPartialFetchWithPausedPartitions() {
+        // this test sends creates a completed fetch with 3 records and a max poll of 2 records to assert
+        // that a fetch that must be returned over at least 2 polls can be cached successfully when its partition is
+        // paused, then returned successfully after its been resumed again later
+        buildFetcher(2);
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords;
+
+        assignFromUser(Utils.mkSet(tp0, tp1));
+
+        subscriptions.seek(tp0, 1);
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+
+        assertEquals("Should return 2 records from fetch with 3 records", 2, fetchedRecords.get(tp0).size());
+        assertFalse("Should have no completed fetches", fetcher.hasCompletedFetches());
+
+        subscriptions.pause(tp0);
+        consumerClient.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+
+        assertEquals("Should return no records for paused partitions", 0, fetchedRecords.size());
+        assertTrue("Should have 1 entry in completed fetches", fetcher.hasCompletedFetches());
+        assertFalse("Should not have any available (non-paused) completed fetches", fetcher.hasAvailableFetches());
+
+        subscriptions.resume(tp0);
+
+        consumerClient.poll(time.timer(0));
+
+        fetchedRecords = fetchedRecords();
+
+        assertEquals("Should return last remaining record", 1, fetchedRecords.get(tp0).size());
+        assertFalse("Should have no completed fetches", fetcher.hasCompletedFetches());
+    }
+
+    @Test
+    public void testFetchDiscardedAfterPausedPartitionResumedAndSeekedToNewOffset() {
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, fetcher.sendFetches());
+        subscriptions.pause(tp0);
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+
+        subscriptions.seek(tp0, 3);
+        subscriptions.resume(tp0);
+        consumerClient.poll(time.timer(0));
+
+        assertTrue("Should have 1 entry in completed fetches", fetcher.hasCompletedFetches());
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetchedRecords();
+        assertEquals("Should not return any records because we seeked to a new offset", 0, fetchedRecords.size());
+        assertNull(fetchedRecords.get(tp0));
+        assertFalse("Should have no completed fetches", fetcher.hasCompletedFetches());
+    }
+
+    @Test
     public void testFetchNotLeaderForPartition() {
         buildFetcher();
         assignFromUser(singleton(tp0));