You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 07:13:00 UTC

[jira] [Commented] (KAFKA-3427) broker can return incorrect version of fetch response when the broker hits an unknown exception

    [ https://issues.apache.org/jira/browse/KAFKA-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375968#comment-16375968 ] 

ASF GitHub Bot commented on KAFKA-3427:
---------------------------------------

hachikuji closed pull request #1128: KAFKA-3427 - Broker should return correct version of FetchResponse on exception
URL: https://github.com/apache/kafka/pull/1128
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 04ca157717b..191c6274929 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -148,7 +148,9 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
       case (topicAndPartition, data) =>
         (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, MessageSet.Empty))
     }
-    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
+
+    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
+    val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData, fetchRequest.versionId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse)))
   }
 
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index ce78f78168d..0c7510e82a7 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -128,7 +128,8 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
   }
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    if(request.requestObj.asInstanceOf[ProducerRequest].requiredAcks == 0) {
+    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
+    if(produceRequest.requiredAcks == 0) {
         requestChannel.closeConnection(request.processor, request)
     }
     else {
@@ -136,7 +137,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
         case (topicAndPartition, data) =>
           (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
       }
-      val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+      val errorResponse = ProducerResponse(correlationId, producerResponseStatus, produceRequest.versionId)
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> broker can return incorrect version of fetch response when the broker hits an unknown exception
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3427
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3427
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.1, 0.10.0.0
>            Reporter: Jun Rao
>            Assignee: Jun Rao
>            Priority: Blocker
>             Fix For: 0.10.0.0
>
>
> In FetchResponse.handleError(), we generate FetchResponse like the following, which always defaults to version 0 of the response. 
>     FetchResponse(correlationId, fetchResponsePartitionData)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)