You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/22 18:21:03 UTC

kafka git commit: KAFKA-2678; partition level lag metrics can be negative

Repository: kafka
Updated Branches:
  refs/heads/trunk 65922b538 -> 2e25f899a


KAFKA-2678; partition level lag metrics can be negative

Author: Dong Lin <li...@cis.upenn.edu>
Author: Dong Lin <li...@gmail.com>

Reviewers: Guozhang Wang

Closes #346 from lindong28/KAFKA-2678


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e25f899
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e25f899
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e25f899

Branch: refs/heads/trunk
Commit: 2e25f899a118e7d4d5eb89118e447a87ad02f71c
Parents: 65922b5
Author: Dong Lin <li...@cis.upenn.edu>
Authored: Thu Oct 22 09:26:05 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Oct 22 09:26:05 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/AbstractFetcherThread.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e25f899/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 21c7e3e..eba2d5a 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -132,7 +132,7 @@ abstract class AbstractFetcherThread(name: String,
                       case None => currentPartitionFetchState.offset
                     }
                     partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
-                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.highWatermark - newOffset
+                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     fetcherStats.byteRate.mark(validBytes)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)