You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/22 17:32:31 UTC

git commit: KAFKA-816 Reduce noise in Kafka server logs due to NotLeaderForPartitionException; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 08b2a37c3 -> 51421fcc0


KAFKA-816 Reduce noise in Kafka server logs due to NotLeaderForPartitionException; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51421fcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51421fcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51421fcc

Branch: refs/heads/0.8
Commit: 51421fcc0111031bb77f779a6f6c00520d526a34
Parents: 08b2a37
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 22 09:32:27 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 22 09:32:27 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaApis.scala |   28 ++++++++++++++++-
 1 files changed, 27 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51421fcc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cfabfc1..87ca6b0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -197,10 +197,19 @@ class KafkaApis(val requestChannel: RequestChannel,
               .format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
         ProduceResult(topicAndPartition, start, end)
       } catch {
+        // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+        // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request
+        // for a partition it is the leader for
         case e: KafkaStorageException =>
           fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
           Runtime.getRuntime.halt(1)
           null
+        case utpe: UnknownTopicOrPartitionException =>
+          warn(utpe.getMessage)
+          new ProduceResult(topicAndPartition, utpe)
+        case nle: NotLeaderForPartitionException =>
+          warn(nle.getMessage)
+          new ProduceResult(topicAndPartition, nle)
         case e =>
           BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
           BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -278,7 +287,16 @@ class KafkaApis(val requestChannel: RequestChannel,
               new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
             }
           } catch {
-            case t: Throwable =>
+            // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+            // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
+            // for a partition it is the leader for
+            case utpe: UnknownTopicOrPartitionException =>
+              warn(utpe.getMessage)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
+            case nle: NotLeaderForPartitionException =>
+              warn(nle.getMessage)
+              new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
+            case t =>
               BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize), t)
@@ -344,6 +362,14 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
         (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
       } catch {
+        // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
+        // are typically transient and there is no value in logging the entire stack trace for the same
+        case utpe: UnknownTopicOrPartitionException =>
+          warn(utpe.getMessage)
+          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) )
+        case nle: NotLeaderForPartitionException =>
+          warn(nle.getMessage)
+          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
         case e =>
           warn("Error while responding to offset request", e)
           (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )