You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/03/03 20:46:29 UTC

[3/3] kafka git commit: KAFKA-1986; Request failure rate should not include invalid message size and offset out of range; reviewed by Joel Koshy

KAFKA-1986; Request failure rate should not include invalid message size and offset out of range; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57d38f67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57d38f67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57d38f67

Branch: refs/heads/trunk
Commit: 57d38f672bcb85fdb20d8ca3fab9bd60d1bc8965
Parents: c5d654a
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Mar 3 11:21:04 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Mar 3 11:21:04 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57d38f67/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 586cf4c..c527482 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -374,6 +374,8 @@ class ReplicaManager(val config: KafkaConfig,
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl)))
           case mstl: MessageSetSizeTooLargeException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl)))
+          case imse : InvalidMessageSizeException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
           case t: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
@@ -483,6 +485,8 @@ class ReplicaManager(val config: KafkaConfig,
             LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(nle))
           case rnae: ReplicaNotAvailableException =>
             LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(rnae))
+          case oor : OffsetOutOfRangeException =>
+            LogReadResult(FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty), -1L, fetchSize, Some(oor))
           case e: Throwable =>
             BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
             BrokerTopicStats.getBrokerAllTopicsStats().failedFetchRequestRate.mark()