You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:58 UTC

[13/36] git commit: kafka-994; High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size; patched by Sam Meder; reviewed by Jay Kreps and Jun Rao

kafka-994; High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size; patched by Sam Meder; reviewed by Jay Kreps and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c8aefc2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c8aefc2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c8aefc2

Branch: refs/heads/trunk
Commit: 0c8aefc251d03ba824cecd6acbcbdf143fdebfb8
Parents: 76d3905
Author: Sam Meder <sa...@gmail.com>
Authored: Thu Aug 1 21:30:05 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Aug 1 21:30:05 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0c8aefc2/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 64b702b..9c779ce 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -62,6 +62,8 @@ class PartitionTopicInfo(val topic: String,
       debug("updated fetch offset of (%s) to %d".format(this, next))
       consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size)
       consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size)
+    } else if(messages.sizeInBytes > 0) {
+      chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
     }
   }