You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/19 00:23:13 UTC

kafka git commit: KAFKA-3632; remove fetcher metrics on shutdown and leader migration

Repository: kafka
Updated Branches:
  refs/heads/0.9.0 c2f8a53e7 -> ec276b38f


KAFKA-3632; remove fetcher metrics on shutdown and leader migration

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1518 from hachikuji/port-kafka-3632-to-0.9


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec276b38
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec276b38
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec276b38

Branch: refs/heads/0.9.0
Commit: ec276b38fb36ed6b31365d19f2d1b34a7a3dd79d
Parents: c2f8a53
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Jun 19 02:23:04 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sun Jun 19 02:23:04 2016 +0200

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherThread.scala    |  59 +++++++--
 .../server/AbstractFetcherThreadTest.scala      | 127 +++++++++++++++++++
 2 files changed, 176 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ec276b38/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index eba2d5a..1d69f89 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -76,6 +76,10 @@ abstract class AbstractFetcherThread(name: String,
       partitionMapCond.signalAll()
     }
     awaitShutdown()
+
+    // we don't need the lock since the thread has finished shutdown and metric removal is safe
+    fetcherStats.unregister()
+    fetcherLagStats.unregister()
   }
 
   override def doWork() {
@@ -132,7 +136,7 @@ abstract class AbstractFetcherThread(name: String,
                       case None => currentPartitionFetchState.offset
                     }
                     partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
-                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
+                    fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
                     fetcherStats.byteRate.mark(validBytes)
                     // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                     processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
@@ -206,8 +210,12 @@ abstract class AbstractFetcherThread(name: String,
 
   def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
     partitionMapLock.lockInterruptibly()
-    try topicAndPartitions.foreach(partitionMap.remove)
-    finally partitionMapLock.unlock()
+    try {
+      topicAndPartitions.foreach { topicAndPartition =>
+        partitionMap.remove(topicAndPartition)
+        fetcherLagStats.unregister(topicAndPartition.topic, topicAndPartition.partition)
+      }
+    } finally partitionMapLock.unlock()
   }
 
   def partitionCount() = {
@@ -234,15 +242,25 @@ object AbstractFetcherThread {
 
 }
 
+object FetcherMetrics {
+  val ConsumerLag = "ConsumerLag"
+  val RequestsPerSec = "RequestsPerSec"
+  val BytesPerSec = "BytesPerSec"
+}
+
 class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
+
   private[this] val lagVal = new AtomicLong(-1L)
-  newGauge("ConsumerLag",
+  private[this] val tags = Map(
+    "clientId" -> metricId.clientId,
+    "topic" -> metricId.topic,
+    "partition" -> metricId.partitionId.toString)
+
+  newGauge(FetcherMetrics.ConsumerLag,
     new Gauge[Long] {
       def value = lagVal.get
     },
-    Map("clientId" -> metricId.clientId,
-      "topic" -> metricId.topic,
-      "partition" -> metricId.partitionId.toString)
+    tags
   )
 
   def lag_=(newLag: Long) {
@@ -250,15 +268,30 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGr
   }
 
   def lag = lagVal.get
+
+  def unregister() {
+    removeMetric(FetcherMetrics.ConsumerLag, tags)
+  }
 }
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
   private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
   val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
-  def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
+  def getAndMaybePut(topic: String, partitionId: Int): FetcherLagMetrics = {
     stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
   }
+
+  def unregister(topic: String, partitionId: Int) {
+    val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+    if (lagMetrics != null) lagMetrics.unregister()
+  }
+
+  def unregister() {
+    stats.keys.toBuffer.foreach { key: ClientIdTopicPartition =>
+      unregister(key.topic, key.partitionId)
+    }
+  }
 }
 
 class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
@@ -266,9 +299,15 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
     "brokerHost" -> metricId.brokerHost,
     "brokerPort" -> metricId.brokerPort.toString)
 
-  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+  val requestRate = newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+
+  val byteRate = newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags)
+
+  def unregister() {
+    removeMetric(FetcherMetrics.RequestsPerSec, tags)
+    removeMetric(FetcherMetrics.BytesPerSec, tags)
+  }
 
-  val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
 }
 
 case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec276b38/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
new file mode 100644
index 0000000..b95f2bf
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -0,0 +1,127 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import com.yammer.metrics.Metrics
+import kafka.cluster.BrokerEndPoint
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert.{assertFalse, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class AbstractFetcherThreadTest {
+
+  @Before
+  def cleanMetricRegistry(): Unit = {
+    for (metricName <- Metrics.defaultRegistry().allMetrics().keySet().asScala)
+      Metrics.defaultRegistry().removeMetric(metricName)
+  }
+
+  @Test
+  def testMetricsRemovedOnShutdown() {
+    val partition = new TopicAndPartition("topic", 0)
+    val fetcherThread = new DummyFetcherThread("dummy", "client", new BrokerEndPoint(0, "localhost", 9092))
+
+    fetcherThread.start()
+
+    // add one partition to create the consumer lag metric
+    fetcherThread.addPartitions(Map(partition -> 0L))
+
+    // wait until all fetcher metrics are present
+    TestUtils.waitUntilTrue(() =>
+      allMetricsNames == Set(FetcherMetrics.BytesPerSec, FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag),
+      "Failed waiting for all fetcher metrics to be registered")
+
+    fetcherThread.shutdown()
+
+    // after shutdown, they should be gone
+    assertTrue(Metrics.defaultRegistry().allMetrics().isEmpty)
+  }
+
+  @Test
+  def testConsumerLagRemovedWithPartition() {
+    val partition = new TopicAndPartition("topic", 0)
+    val fetcherThread = new DummyFetcherThread("dummy", "client", new BrokerEndPoint(0, "localhost", 9092))
+
+    fetcherThread.start()
+
+    // add one partition to create the consumer lag metric
+    fetcherThread.addPartitions(Map(partition -> 0L))
+
+    // wait until lag metric is present
+    TestUtils.waitUntilTrue(() => allMetricsNames(FetcherMetrics.ConsumerLag),
+      "Failed waiting for consumer lag metric")
+
+    // remove the partition to simulate leader migration
+    fetcherThread.removePartitions(Set(partition))
+
+    // the lag metric should now be gone
+    assertFalse(allMetricsNames(FetcherMetrics.ConsumerLag))
+
+    fetcherThread.shutdown()
+  }
+
+  private def allMetricsNames = Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
+
+  class DummyFetchRequest(val offsets: collection.Map[TopicAndPartition, Long]) extends FetchRequest {
+    override def isEmpty: Boolean = offsets.isEmpty
+
+    override def offset(topicAndPartition: TopicAndPartition): Long = offsets(topicAndPartition)
+  }
+
+  class DummyPartitionData extends PartitionData {
+    override def errorCode: Short = Errors.NONE.code
+
+    override def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet()
+
+    override def highWatermark: Long = 0L
+
+    override def exception: Option[Throwable] = None
+  }
+
+  class DummyFetcherThread(name: String,
+                           clientId: String,
+                           sourceBroker: BrokerEndPoint)
+    extends AbstractFetcherThread(name, clientId, sourceBroker, 0) {
+
+    type REQ = DummyFetchRequest
+    type PD = PartitionData
+
+    override def processPartitionData(topicAndPartition: TopicAndPartition,
+                                      fetchOffset: Long,
+                                      partitionData: PartitionData): Unit = {}
+
+    override def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = 0L
+
+    override def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]): Unit = {}
+
+    override protected def fetch(fetchRequest: DummyFetchRequest): collection.Map[TopicAndPartition, DummyPartitionData] = {
+      fetchRequest.offsets.mapValues(_ => new DummyPartitionData)
+    }
+
+    override protected def buildFetchRequest(partitionMap: collection.Map[TopicAndPartition, PartitionFetchState]): DummyFetchRequest = {
+      new DummyFetchRequest(partitionMap.mapValues(_.offset))
+    }
+  }
+
+}