You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/14 03:48:56 UTC
kafka git commit: KAFKA-2632: move fetchable check ahead in
handleFetchResponse
Repository: kafka
Updated Branches:
refs/heads/trunk b1ce9494e -> d8c575079
KAFKA-2632: move fetchable check ahead in handleFetchResponse
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jason Gustafson
Closes #295 from guozhangwang/K2632
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d8c57507
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d8c57507
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d8c57507
Branch: refs/heads/trunk
Commit: d8c5750799b9c5c4485514b6b02560679af8255d
Parents: b1ce949
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Oct 13 18:48:45 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Oct 13 18:48:45 2015 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 24 +++++++++++++++-----
1 file changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d8c57507/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4608959..4d68e74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -302,13 +302,23 @@ public class Fetcher<K, V> {
throwIfOffsetOutOfRange();
for (PartitionRecords<K, V> part : this.records) {
- if (!subscriptions.isFetchable(part.partition)) {
- log.debug("Ignoring fetched records for {} since it is no longer fetchable", part.partition);
+ if (!subscriptions.isAssigned(part.partition)) {
+ // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for partition {} since it is no longer assigned", part.partition);
continue;
}
- Long consumed = subscriptions.consumed(part.partition);
- if (consumed != null && part.fetchOffset == consumed) {
+ // note that the consumed position should always be available
+ // as long as the partition is still assigned
+ long consumed = subscriptions.consumed(part.partition);
+ if (!subscriptions.isFetchable(part.partition)) {
+ // this can happen when a partition consumption paused before fetched records are returned to the consumer's poll call
+ log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", part.partition);
+
+ // we also need to reset the fetch positions to pretend we did not fetch
+ // this partition in the previous request at all
+ subscriptions.fetched(part.partition, consumed);
+ } else if (part.fetchOffset == consumed) {
List<ConsumerRecord<K, V>> records = drained.get(part.partition);
if (records == null) {
records = part.records;
@@ -445,8 +455,10 @@ public class Fetcher<K, V> {
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
FetchResponse.PartitionData partition = entry.getValue();
- if (!subscriptions.assignedPartitions().contains(tp)) {
- log.debug("Ignoring fetched data for partition {} which is no longer assigned.", tp);
+ if (!subscriptions.isFetchable(tp)) {
+ // this can happen when a rebalance happened or a partition consumption paused
+ // while fetch is still in-flight
+ log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (partition.errorCode == Errors.NONE.code()) {
long fetchOffset = request.fetchData().get(tp).offset;