You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/14 01:18:07 UTC
git commit: KAFKA-955 (followup patch) After a leader change,
messages sent with ack=0 are lost; reviewed by Neha Narkhede and Jun Rao
Updated Branches:
refs/heads/0.8 c12d2ea9e -> 0c1885b80
KAFKA-955 (followup patch) After a leader change, messages sent with ack=0 are lost; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c1885b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c1885b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c1885b8
Branch: refs/heads/0.8
Commit: 0c1885b800077e4d360935a6d91fe1068a684560
Parents: c12d2ea
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Sep 13 16:17:55 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Sep 13 16:17:55 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/api/ProducerRequest.scala | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0c1885b8/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index fda3e39..c606351 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
}
override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- val producerResponseStatus = data.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+ if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+ requestChannel.closeConnection(request.processor, request)
+ }
+ else {
+ val producerResponseStatus = data.map {
+ case (topicAndPartition, data) =>
+ (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+ }
+ val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
- val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
def emptyData(){