You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/05 20:39:59 UTC

kafka git commit: KAFKA-3010; Include error in log when ack 0

Repository: kafka
Updated Branches:
  refs/heads/trunk 23d607dc2 -> 5db147c1d


KAFKA-3010; Include error in log when ack 0

I verified this by trying to produce to __consumer_offsets and the logged message looks like:

[2015-12-22 10:34:40,897] INFO [KafkaApi-0] Closing connection due to error during produce request with correlation id 1 from client id console-producer with ack=0
Topic and partition to exceptions: [__consumer_offsets,43] -> kafka.common.InvalidTopicException (kafka.server.KafkaApis)

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #709 from ijuma/kafka-3010-include-error-in-log-when-ack-0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5db147c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5db147c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5db147c1

Branch: refs/heads/trunk
Commit: 5db147c1d453c9dabcc277bd95435f17201b9c1c
Parents: 23d607d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Jan 5 11:39:56 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 5 11:39:56 2016 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 31 +++++++++-----------
 1 file changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5db147c1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8589400..cbf5031 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -218,10 +218,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
 
       mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
-        // we only print warnings for known errors here; only replica manager could see an unknown
-        // exception while trying to write the offset message to the local log, and it will log
-        // an error message and write the error code in this case; hence it can be ignored here
-        if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) {
+        if (errorCode != ErrorMapping.NoError) {
           debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
             .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
               topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
@@ -315,19 +312,19 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a produce response
     def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
-      var errorInResponse = false
+
       val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
 
+      var errorInResponse = false
+
       mergedResponseStatus.foreach { case (topicAndPartition, status) =>
-        // we only print warnings for known errors here; if it is unknown, it will cause
-        // an error message in the replica manager
-        if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
+        if (status.error != ErrorMapping.NoError) {
+          errorInResponse = true
           debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
             produceRequest.correlationId,
             produceRequest.clientId,
             topicAndPartition,
             ErrorMapping.exceptionNameFor(status.error)))
-          errorInResponse = true
         }
       }
 
@@ -338,10 +335,14 @@ class KafkaApis(val requestChannel: RequestChannel,
           // the request, since no response is expected by the producer, the server will close socket server so that
           // the producer client will know that some error has happened and will refresh its metadata
           if (errorInResponse) {
+            val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
+              topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
+            }.mkString(", ")
             info(
-              "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format(
-                produceRequest.correlationId,
-                produceRequest.clientId))
+              s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
+                s"from client id ${produceRequest.clientId} with ack=0\n" +
+                s"Topic and partition to exceptions: $exceptionsSummary"
+            )
             requestChannel.closeConnection(request.processor, request)
           } else {
             requestChannel.noOperation(request.processor, request)
@@ -368,8 +369,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (authorizedRequestInfo.isEmpty)
       sendResponseCallback(Map.empty)
     else {
-      // only allow appending to internal topic partitions
-      // if the client is not from admin
       val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
 
       // call the replica manager to append messages to the replicas
@@ -404,9 +403,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
 
       mergedResponseStatus.foreach { case (topicAndPartition, data) =>
-        // we only print warnings for known errors here; if it is unknown, it will cause
-        // an error message in the replica manager already and hence can be ignored here
-        if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) {
+        if (data.error != ErrorMapping.NoError) {
           debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s"
             .format(fetchRequest.correlationId, fetchRequest.clientId,
             topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))