You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/09/11 04:42:49 UTC
[kafka] branch trunk updated: KAFKA-7096 : Clear buffered data for
partitions that are explicitly unassigned by user
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25b13bc KAFKA-7096 : Clear buffered data for partitions that are explicitly unassigned by user
25b13bc is described below
commit 25b13bca7b696b349bae2e9d5e41b75496bfdb1b
Author: mgharat <gh...@gmail.com>
AuthorDate: Mon Sep 10 21:40:08 2018 -0700
KAFKA-7096 : Clear buffered data for partitions that are explicitly unassigned by user
Author: mgharat <gh...@gmail.com>
Reviewers: Dong Lin <li...@gmail.com>
Closes #5289 from MayureshGharat/KAFKA-7096
---
.../kafka/clients/consumer/KafkaConsumer.java | 6 ++--
.../kafka/clients/consumer/internals/Fetcher.java | 34 ++++++++++++++++++++++
.../clients/consumer/internals/FetcherTest.java | 19 ++++++++++++
3 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ea3cfd..4cdc4f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
throwIfNoAssignorsConfigured();
-
+ fetcher.clearBufferedDataForUnassignedTopics(topics);
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
@@ -1019,10 +1019,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void unsubscribe() {
acquireAndEnsureOpen();
try {
- log.info("Unsubscribed all topics or patterns and assigned partitions");
+ fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
+ log.info("Unsubscribed all topics or patterns and assigned partitions");
} finally {
release();
}
@@ -1063,6 +1064,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
topics.add(topic);
}
+ fetcher.clearBufferedDataForUnassignedPartitions(partitions);
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc0daa2..a92f57e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,40 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
sensors.updatePartitionLagAndLeadSensors(assignment);
}
+ /**
+ * Clear the buffered data which are not a part of newly assigned partitions
+ *
+ * @param assignedPartitions newly assigned {@link TopicPartition}
+ */
+ public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> assignedPartitions) {
+ Iterator<CompletedFetch> itr = completedFetches.iterator();
+ while (itr.hasNext()) {
+ TopicPartition tp = itr.next().partition;
+ if (!assignedPartitions.contains(tp)) {
+ itr.remove();
+ }
+ }
+ if (nextInLineRecords != null && !assignedPartitions.contains(nextInLineRecords.partition)) {
+ nextInLineRecords.drain();
+ nextInLineRecords = null;
+ }
+ }
+
+ /**
+ * Clear the buffered data which are not a part of newly assigned topics
+ *
+ * @param assignedTopics newly assigned topics
+ */
+ public void clearBufferedDataForUnassignedTopics(Collection<String> assignedTopics) {
+ Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+ for (TopicPartition tp : subscriptions.assignedPartitions()) {
+ if (assignedTopics.contains(tp.topic())) {
+ currentTopicPartitions.add(tp);
+ }
+ }
+ clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+ }
+
public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry metricsRegistry) {
Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg), new Avg());
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d314a4d..afe5b2f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -288,6 +288,25 @@ public class FetcherTest {
}
@Test
+ public void testClearBufferedDataForTopicPartitions() {
+ subscriptions.assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ // normal fetch
+ assertEquals(1, fetcher.sendFetches());
+ assertFalse(fetcher.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
+ consumerClient.poll(time.timer(0));
+ assertTrue(fetcher.hasCompletedFetches());
+ Set<TopicPartition> newAssignedTopicPartitions = new HashSet<>();
+ newAssignedTopicPartitions.add(tp1);
+
+ fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
+ assertFalse(fetcher.hasCompletedFetches());
+ }
+
+ @Test
public void testFetchSkipsBlackedOutNodes() {
subscriptions.assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);