You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/22 17:32:31 UTC
git commit: KAFKA-816 Reduce noise in Kafka server logs due to
NotLeaderForPartitionException; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 08b2a37c3 -> 51421fcc0
KAFKA-816 Reduce noise in Kafka server logs due to NotLeaderForPartitionException; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51421fcc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51421fcc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51421fcc
Branch: refs/heads/0.8
Commit: 51421fcc0111031bb77f779a6f6c00520d526a34
Parents: 08b2a37
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 22 09:32:27 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 22 09:32:27 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaApis.scala | 28 ++++++++++++++++-
1 files changed, 27 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/51421fcc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cfabfc1..87ca6b0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -197,10 +197,19 @@ class KafkaApis(val requestChannel: RequestChannel,
.format(messages.size, topicAndPartition.topic, topicAndPartition.partition, start, end))
ProduceResult(topicAndPartition, start, end)
} catch {
+ // NOTE: Failed produce requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+ // since failed produce requests metric is supposed to indicate failure of a broker in handling a produce request
+ // for a partition it is the leader for
case e: KafkaStorageException =>
fatal("Halting due to unrecoverable I/O error while handling produce request: ", e)
Runtime.getRuntime.halt(1)
null
+ case utpe: UnknownTopicOrPartitionException =>
+ warn(utpe.getMessage)
+ new ProduceResult(topicAndPartition, utpe)
+ case nle: NotLeaderForPartitionException =>
+ warn(nle.getMessage)
+ new ProduceResult(topicAndPartition, nle)
case e =>
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -278,7 +287,16 @@ class KafkaApis(val requestChannel: RequestChannel,
new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
}
} catch {
- case t: Throwable =>
+ // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+ // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
+ // for a partition it is the leader for
+ case utpe: UnknownTopicOrPartitionException =>
+ warn(utpe.getMessage)
+ new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
+ case nle: NotLeaderForPartitionException =>
+ warn(nle.getMessage)
+ new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
+ case t =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t)
@@ -344,6 +362,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
} catch {
+ // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
+ // are typically transient and there is no value in logging the entire stack trace for the same
+ case utpe: UnknownTopicOrPartitionException =>
+ warn(utpe.getMessage)
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), Nil) )
+ case nle: NotLeaderForPartitionException =>
+ warn(nle.getMessage)
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), Nil) )
case e =>
warn("Error while responding to offset request", e)
(topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )