You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/09/11 04:42:49 UTC

[kafka] branch trunk updated: KAFKA-7096 : Clear buffered data for partitions that are explicitly unassigned by user

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25b13bc  KAFKA-7096 : Clear buffered data for partitions that are explicitly unassigned by user
25b13bc is described below

commit 25b13bca7b696b349bae2e9d5e41b75496bfdb1b
Author: mgharat <gh...@gmail.com>
AuthorDate: Mon Sep 10 21:40:08 2018 -0700

    KAFKA-7096 : Clear buffered data for partitions that are explicitly unassigned by user
    
    Author: mgharat <gh...@gmail.com>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5289 from MayureshGharat/KAFKA-7096
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  6 ++--
 .../kafka/clients/consumer/internals/Fetcher.java  | 34 ++++++++++++++++++++++
 .../clients/consumer/internals/FetcherTest.java    | 19 ++++++++++++
 3 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ea3cfd..4cdc4f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 }
 
                 throwIfNoAssignorsConfigured();
-
+                fetcher.clearBufferedDataForUnassignedTopics(topics);
                 log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
                 this.subscriptions.subscribe(new HashSet<>(topics), listener);
                 metadata.setTopics(subscriptions.groupSubscription());
@@ -1019,10 +1019,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public void unsubscribe() {
         acquireAndEnsureOpen();
         try {
-            log.info("Unsubscribed all topics or patterns and assigned partitions");
+            fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
             this.subscriptions.unsubscribe();
             this.coordinator.maybeLeaveGroup();
             this.metadata.needMetadataForAllTopics(false);
+            log.info("Unsubscribed all topics or patterns and assigned partitions");
         } finally {
             release();
         }
@@ -1063,6 +1064,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                         throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                     topics.add(topic);
                 }
+                fetcher.clearBufferedDataForUnassignedPartitions(partitions);
 
                 // make sure the offsets of topic partitions the consumer is unsubscribing from
                 // are committed since there will be no following rebalance
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc0daa2..a92f57e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,40 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         sensors.updatePartitionLagAndLeadSensors(assignment);
     }
 
+    /**
+     * Clear the buffered data which are not a part of newly assigned partitions
+     *
+     * @param assignedPartitions  newly assigned {@link TopicPartition}
+     */
+    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
+        Iterator<CompletedFetch> itr = completedFetches.iterator();
+        while (itr.hasNext()) {
+            TopicPartition tp = itr.next().partition;
+            if (!assignedPartitions.contains(tp)) {
+                itr.remove();
+            }
+        }
+        if (nextInLineRecords != null && !assignedPartitions.contains(nextInLineRecords.partition)) {
+            nextInLineRecords.drain();
+            nextInLineRecords = null;
+        }
+    }
+
+    /**
+     * Clear the buffered data which are not a part of newly assigned topics
+     *
+     * @param assignedTopics  newly assigned topics
+     */
+    public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {
+        Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+        for (TopicPartition tp : subscriptions.assignedPartitions()) {
+            if (assignedTopics.contains(tp.topic())) {
+                currentTopicPartitions.add(tp);
+            }
+        }
+        clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+    }
+
     public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
         Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
         fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d314a4d..afe5b2f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -288,6 +288,25 @@ public class FetcherTest {
     }
 
     @Test
+    public void testClearBufferedDataForTopicPartitions() {
+        subscriptions.assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        Set<TopicPartition> newAssignedTopicPartitions = new HashSet<>();
+        newAssignedTopicPartitions.add(tp1);
+
+        fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
+        assertFalse(fetcher.hasCompletedFetches());
+    }
+
+    @Test
     public void testFetchSkipsBlackedOutNodes() {
         subscriptions.assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);