You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/20 04:28:36 UTC
kafka git commit: HOTFIX: check logic of KAFKA-2515 should be on
buffer.limit()
Repository: kafka
Updated Branches:
refs/heads/trunk 14c128a16 -> 8f32617e6
HOTFIX: check logic of KAFKA-2515 should be on buffer.limit()
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ewen Cheslack-Postava, Guozhang Wang
Closes #332 from guozhangwang/K2515-hotfix
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f32617e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f32617e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f32617e
Branch: refs/heads/trunk
Commit: 8f32617e6506a5f67d4f922f4088507994ba91ef
Parents: 14c128a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Oct 19 19:33:27 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 19 19:33:27 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/clients/consumer/internals/Fetcher.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8f32617e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3d02bfd..7e55d46 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -512,8 +512,8 @@ public class Fetcher<K, V> {
this.subscriptions.fetched(tp, record.offset() + 1);
this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
- } else if (buffer.capacity() >= this.fetchSize) {
- // we did not read a single message from a max fetchable buffer
+ } else if (buffer.limit() > 0) {
+ // we did not read a single message from a non-empty buffer
// because that message's size is larger than fetch size, in this case
// record this exception
this.recordTooLargePartitions.put(tp, fetchOffset);