You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/19 00:23:13 UTC
kafka git commit: KAFKA-3632;
remove fetcher metrics on shutdown and leader migration
Repository: kafka
Updated Branches:
refs/heads/0.9.0 c2f8a53e7 -> ec276b38f
KAFKA-3632; remove fetcher metrics on shutdown and leader migration
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1518 from hachikuji/port-kafka-3632-to-0.9
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec276b38
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec276b38
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec276b38
Branch: refs/heads/0.9.0
Commit: ec276b38fb36ed6b31365d19f2d1b34a7a3dd79d
Parents: c2f8a53
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sun Jun 19 02:23:04 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sun Jun 19 02:23:04 2016 +0200
----------------------------------------------------------------------
.../kafka/server/AbstractFetcherThread.scala | 59 +++++++--
.../server/AbstractFetcherThreadTest.scala | 127 +++++++++++++++++++
2 files changed, 176 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec276b38/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index eba2d5a..1d69f89 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -76,6 +76,10 @@ abstract class AbstractFetcherThread(name: String,
partitionMapCond.signalAll()
}
awaitShutdown()
+
+ // we don't need the lock since the thread has finished shutdown and metric removal is safe
+ fetcherStats.unregister()
+ fetcherLagStats.unregister()
}
override def doWork() {
@@ -132,7 +136,7 @@ abstract class AbstractFetcherThread(name: String,
case None => currentPartitionFetchState.offset
}
partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
- fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
+ fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
fetcherStats.byteRate.mark(validBytes)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
@@ -206,8 +210,12 @@ abstract class AbstractFetcherThread(name: String,
def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
partitionMapLock.lockInterruptibly()
- try topicAndPartitions.foreach(partitionMap.remove)
- finally partitionMapLock.unlock()
+ try {
+ topicAndPartitions.foreach { topicAndPartition =>
+ partitionMap.remove(topicAndPartition)
+ fetcherLagStats.unregister(topicAndPartition.topic, topicAndPartition.partition)
+ }
+ } finally partitionMapLock.unlock()
}
def partitionCount() = {
@@ -234,15 +242,25 @@ object AbstractFetcherThread {
}
+object FetcherMetrics {
+ val ConsumerLag = "ConsumerLag"
+ val RequestsPerSec = "RequestsPerSec"
+ val BytesPerSec = "BytesPerSec"
+}
+
class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
+
private[this] val lagVal = new AtomicLong(-1L)
- newGauge("ConsumerLag",
+ private[this] val tags = Map(
+ "clientId" -> metricId.clientId,
+ "topic" -> metricId.topic,
+ "partition" -> metricId.partitionId.toString)
+
+ newGauge(FetcherMetrics.ConsumerLag,
new Gauge[Long] {
def value = lagVal.get
},
- Map("clientId" -> metricId.clientId,
- "topic" -> metricId.topic,
- "partition" -> metricId.partitionId.toString)
+ tags
)
def lag_=(newLag: Long) {
@@ -250,15 +268,30 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGr
}
def lag = lagVal.get
+
+ def unregister() {
+ removeMetric(FetcherMetrics.ConsumerLag, tags)
+ }
}
class FetcherLagStats(metricId: ClientIdAndBroker) {
private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
- def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
+ def getAndMaybePut(topic: String, partitionId: Int): FetcherLagMetrics = {
stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
}
+
+ def unregister(topic: String, partitionId: Int) {
+ val lagMetrics = stats.remove(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
+ if (lagMetrics != null) lagMetrics.unregister()
+ }
+
+ def unregister() {
+ stats.keys.toBuffer.foreach { key: ClientIdTopicPartition =>
+ unregister(key.topic, key.partitionId)
+ }
+ }
}
class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
@@ -266,9 +299,15 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
"brokerHost" -> metricId.brokerHost,
"brokerPort" -> metricId.brokerPort.toString)
- val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+ val requestRate = newMeter(FetcherMetrics.RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
+
+ val byteRate = newMeter(FetcherMetrics.BytesPerSec, "bytes", TimeUnit.SECONDS, tags)
+
+ def unregister() {
+ removeMetric(FetcherMetrics.RequestsPerSec, tags)
+ removeMetric(FetcherMetrics.BytesPerSec, tags)
+ }
- val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
}
case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec276b38/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
new file mode 100644
index 0000000..b95f2bf
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import com.yammer.metrics.Metrics
+import kafka.cluster.BrokerEndPoint
+import kafka.common.TopicAndPartition
+import kafka.message.ByteBufferMessageSet
+import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.protocol.Errors
+import org.junit.Assert.{assertFalse, assertTrue}
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class AbstractFetcherThreadTest {
+
+ @Before
+ def cleanMetricRegistry(): Unit = {
+ for (metricName <- Metrics.defaultRegistry().allMetrics().keySet().asScala)
+ Metrics.defaultRegistry().removeMetric(metricName)
+ }
+
+ @Test
+ def testMetricsRemovedOnShutdown() {
+ val partition = new TopicAndPartition("topic", 0)
+ val fetcherThread = new DummyFetcherThread("dummy", "client", new BrokerEndPoint(0, "localhost", 9092))
+
+ fetcherThread.start()
+
+ // add one partition to create the consumer lag metric
+ fetcherThread.addPartitions(Map(partition -> 0L))
+
+ // wait until all fetcher metrics are present
+ TestUtils.waitUntilTrue(() =>
+ allMetricsNames == Set(FetcherMetrics.BytesPerSec, FetcherMetrics.RequestsPerSec, FetcherMetrics.ConsumerLag),
+ "Failed waiting for all fetcher metrics to be registered")
+
+ fetcherThread.shutdown()
+
+ // after shutdown, they should be gone
+ assertTrue(Metrics.defaultRegistry().allMetrics().isEmpty)
+ }
+
+ @Test
+ def testConsumerLagRemovedWithPartition() {
+ val partition = new TopicAndPartition("topic", 0)
+ val fetcherThread = new DummyFetcherThread("dummy", "client", new BrokerEndPoint(0, "localhost", 9092))
+
+ fetcherThread.start()
+
+ // add one partition to create the consumer lag metric
+ fetcherThread.addPartitions(Map(partition -> 0L))
+
+ // wait until lag metric is present
+ TestUtils.waitUntilTrue(() => allMetricsNames(FetcherMetrics.ConsumerLag),
+ "Failed waiting for consumer lag metric")
+
+ // remove the partition to simulate leader migration
+ fetcherThread.removePartitions(Set(partition))
+
+ // the lag metric should now be gone
+ assertFalse(allMetricsNames(FetcherMetrics.ConsumerLag))
+
+ fetcherThread.shutdown()
+ }
+
+ private def allMetricsNames = Metrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
+
+ class DummyFetchRequest(val offsets: collection.Map[TopicAndPartition, Long]) extends FetchRequest {
+ override def isEmpty: Boolean = offsets.isEmpty
+
+ override def offset(topicAndPartition: TopicAndPartition): Long = offsets(topicAndPartition)
+ }
+
+ class DummyPartitionData extends PartitionData {
+ override def errorCode: Short = Errors.NONE.code
+
+ override def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet()
+
+ override def highWatermark: Long = 0L
+
+ override def exception: Option[Throwable] = None
+ }
+
+ class DummyFetcherThread(name: String,
+ clientId: String,
+ sourceBroker: BrokerEndPoint)
+ extends AbstractFetcherThread(name, clientId, sourceBroker, 0) {
+
+ type REQ = DummyFetchRequest
+ type PD = PartitionData
+
+ override def processPartitionData(topicAndPartition: TopicAndPartition,
+ fetchOffset: Long,
+ partitionData: PartitionData): Unit = {}
+
+ override def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = 0L
+
+ override def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]): Unit = {}
+
+ override protected def fetch(fetchRequest: DummyFetchRequest): collection.Map[TopicAndPartition, DummyPartitionData] = {
+ fetchRequest.offsets.mapValues(_ => new DummyPartitionData)
+ }
+
+ override protected def buildFetchRequest(partitionMap: collection.Map[TopicAndPartition, PartitionFetchState]): DummyFetchRequest = {
+ new DummyFetchRequest(partitionMap.mapValues(_.offset))
+ }
+ }
+
+}