You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/14 01:29:46 UTC
git commit: KAFKA-1038;
fetch response should use empty messageset instead of null when
handling errors; patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 0c1885b80 -> aebf74619
KAFKA-1038; fetch response should use empty messageset instead of null when handling errors; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aebf7461
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aebf7461
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aebf7461
Branch: refs/heads/0.8
Commit: aebf746190685d055358ca122aedc424fe024afa
Parents: 0c1885b
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Sep 13 16:29:39 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Sep 13 16:29:39 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aebf7461/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index a807c1f..fb2a230 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -25,6 +25,7 @@ import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
import java.util.concurrent.atomic.AtomicInteger
import kafka.network.RequestChannel
+import kafka.message.MessageSet
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -155,7 +156,7 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
val fetchResponsePartitionData = requestInfo.map {
case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
+ (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
}
val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))