You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/22 18:21:03 UTC
kafka git commit: KAFKA-2678;
partition level lag metrics can be negative
Repository: kafka
Updated Branches:
refs/heads/trunk 65922b538 -> 2e25f899a
KAFKA-2678; partition level lag metrics can be negative
Author: Dong Lin <li...@cis.upenn.edu>
Author: Dong Lin <li...@gmail.com>
Reviewers: Guozhang Wang
Closes #346 from lindong28/KAFKA-2678
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e25f899
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e25f899
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e25f899
Branch: refs/heads/trunk
Commit: 2e25f899a118e7d4d5eb89118e447a87ad02f71c
Parents: 65922b5
Author: Dong Lin <li...@cis.upenn.edu>
Authored: Thu Oct 22 09:26:05 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 22 09:26:05 2015 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2e25f899/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 21c7e3e..eba2d5a 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -132,7 +132,7 @@ abstract class AbstractFetcherThread(name: String,
case None => currentPartitionFetchState.offset
}
partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
- fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.highWatermark - newOffset
+ fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
fetcherStats.byteRate.mark(validBytes)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)