You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/14 03:48:56 UTC

kafka git commit: KAFKA-2632: move fetchable check ahead in handleFetchResponse

Repository: kafka
Updated Branches:
  refs/heads/trunk b1ce9494e -> d8c575079


KAFKA-2632: move fetchable check ahead in handleFetchResponse

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jason Gustafson

Closes #295 from guozhangwang/K2632


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d8c57507
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d8c57507
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d8c57507

Branch: refs/heads/trunk
Commit: d8c5750799b9c5c4485514b6b02560679af8255d
Parents: b1ce949
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Oct 13 18:48:45 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Oct 13 18:48:45 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 24 +++++++++++++++-----
 1 file changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d8c57507/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4608959..4d68e74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -302,13 +302,23 @@ public class Fetcher<K, V> {
             throwIfOffsetOutOfRange();
 
             for (PartitionRecords<K, V> part : this.records) {
-                if (!subscriptions.isFetchable(part.partition)) {
-                    log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition);
+                if (!subscriptions.isAssigned(part.partition)) {
+                    // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
+                    log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
                     continue;
                 }
 
-                Long consumed = subscriptions.consumed(part.partition);
-                if (consumed != null && part.fetchOffset == consumed) {
+                // note that the consumed position should always be available
+                // as long as the partition is still assigned
+                long consumed = subscriptions.consumed(part.partition);
+                if (!subscriptions.isFetchable(part.partition)) {
+                    // this can happen when a partition consumption paused before fetched records are returned to the consumer's poll call
+                    log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
+
+                    // we also need to reset the fetch positions to pretend we did not fetch
+                    // this partition in the previous request at all
+                    subscriptions.fetched(part.partition, consumed);
+                } else if (part.fetchOffset == consumed) {
                     List<ConsumerRecord<K, V>> records = drained.get(part.partition);
                     if (records == null) {
                         records = part.records;
@@ -445,8 +455,10 @@ public class Fetcher<K, V> {
             for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                 TopicPartition tp = entry.getKey();
                 FetchResponse.PartitionData partition = entry.getValue();
-                if (!subscriptions.assignedPartitions().contains(tp)) {
-                    log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
+                if (!subscriptions.isFetchable(tp)) {
+                    // this can happen when a rebalance happened or a partition consumption paused
+                    // while fetch is still in-flight
+                    log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
                 } else if (partition.errorCode == Errors.NONE.code()) {
                     long fetchOffset = request.fetchData().get(tp).offset;