You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/20 04:28:36 UTC

kafka git commit: HOTFIX: check logic of KAFKA-2515 should be on buffer.limit()

Repository: kafka
Updated Branches:
  refs/heads/trunk 14c128a16 -> 8f32617e6


HOTFIX: check logic of KAFKA-2515 should be on buffer.limit()

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ewen Cheslack-Postava, Guozhang Wang

Closes #332 from guozhangwang/K2515-hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8f32617e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8f32617e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8f32617e

Branch: refs/heads/trunk
Commit: 8f32617e6506a5f67d4f922f4088507994ba91ef
Parents: 14c128a
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Oct 19 19:33:27 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Oct 19 19:33:27 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/consumer/internals/Fetcher.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8f32617e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 3d02bfd..7e55d46 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -512,8 +512,8 @@ public class Fetcher<K, V> {
                         this.subscriptions.fetched(tp, record.offset() + 1);
                         this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed));
                         this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
-                    } else if (buffer.capacity() >= this.fetchSize) {
-                        // we did not read a single message from a max fetchable buffer
+                    } else if (buffer.limit() > 0) {
+                        // we did not read a single message from a non-empty buffer
                         // because that message's size is larger than fetch size, in this case
                         // record this exception
                         this.recordTooLargePartitions.put(tp, fetchOffset);