You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/14 01:29:46 UTC

git commit: KAFKA-1038; fetch response should use empty messageset instead of null when handling errors; patched by Jun Rao; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 0c1885b80 -> aebf74619


KAFKA-1038; fetch response should use empty messageset instead of null when handling errors; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aebf7461
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aebf7461
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aebf7461

Branch: refs/heads/0.8
Commit: aebf746190685d055358ca122aedc424fe024afa
Parents: 0c1885b
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Sep 13 16:29:39 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Sep 13 16:29:39 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aebf7461/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index a807c1f..fb2a230 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -25,6 +25,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.atomic.AtomicInteger
 import kafka.network.RequestChannel
+import kafka.message.MessageSet
 
 
 case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -155,7 +156,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val fetchResponsePartitionData = requestInfo.map {
       case (topicAndPartition, data) =>
-        (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
+        (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
     }
     val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))