You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/14 01:18:07 UTC

git commit: KAFKA-955 (followup patch) After a leader change, messages sent with ack=0 are lost; reviewed by Neha Narkhede and Jun Rao

Updated Branches:
  refs/heads/0.8 c12d2ea9e -> 0c1885b80


KAFKA-955 (followup patch) After a leader change, messages sent with ack=0 are lost; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c1885b8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c1885b8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c1885b8

Branch: refs/heads/0.8
Commit: 0c1885b800077e4d360935a6d91fe1068a684560
Parents: c12d2ea
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Sep 13 16:17:55 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Sep 13 16:17:55 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/ProducerRequest.scala | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0c1885b8/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index fda3e39..c606351 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -135,12 +135,17 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val producerResponseStatus = data.map {
-      case (topicAndPartition, data) =>
-        (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+    if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+        requestChannel.closeConnection(request.processor, request)
+    }
+    else {
+      val producerResponseStatus = data.map {
+        case (topicAndPartition, data) =>
+          (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+      }
+      val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+      requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
     }
-    val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
-    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
 
   def emptyData(){