You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/04/26 20:59:38 UTC

kafka git commit: KAFKA-3427; Broker should return correct version of FetchResponse on exception

Repository: kafka
Updated Branches:
  refs/heads/0.9.0 077cabbbc -> 5f316a58a


KAFKA-3427; Broker should return correct version of FetchResponse on exception

Merging the fix from: https://issues.apache.org/jira/browse/KAFKA-3427
The original version of the code, returned a response using V0 of the response protocol. This caused clients to break because they expected the throttle_time_ms field to be present.

Author: Aditya Auradkar <aa...@linkedin.com>

Reviewers: Jun Rao <ju...@apache.org>, Ismael Juma <is...@juma.me.uk>

Closes #1128 from auradkar/k-34


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5f316a58
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5f316a58
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5f316a58

Branch: refs/heads/0.9.0
Commit: 5f316a58a1058ec230e8b4bfdfec1ce081589a20
Parents: 077cabb
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Apr 26 11:59:16 2016 -0700
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 26 11:59:16 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/api/FetchRequest.scala    | 4 +++-
 core/src/main/scala/kafka/api/ProducerRequest.scala | 5 +++--
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5f316a58/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 04ca157..191c627 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -148,7 +148,9 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
       case (topicAndPartition, data) =>
         (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
     }
-    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
+
+    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
+    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5f316a58/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index ce78f78..0c7510e 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -128,7 +128,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
+    if(produceRequest.requiredAcks == 0) {
         requestChannel.closeConnection(request.processor, request)
     }
     else {
@@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
         case (topicAndPartition, data) =>
           (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
       }
-      val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+      val errorResponse = ProducerResponse(correlationId, producerResponseStatus, produceRequest.versionId)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
     }
   }