You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/18 21:19:04 UTC
[kafka] branch trunk updated: KIP-551: Expose disk read and write
metrics (#8569)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1f2ff73 KIP-551: Expose disk read and write metrics (#8569)
1f2ff73 is described below
commit 1f2ff73b28ddb538c4fda263c31b56368787970d
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon May 18 14:18:16 2020 -0700
KIP-551: Expose disk read and write metrics (#8569)
Reviewers: David Arthur <mu...@gmail.com>, Mickael Maison <mi...@gmail.com>
---
.../kafka/metrics/LinuxIoMetricsCollector.scala | 102 +++++++++++++++++++++
core/src/main/scala/kafka/server/KafkaServer.scala | 7 ++
.../metrics/LinuxIoMetricsCollectorTest.scala | 84 +++++++++++++++++
3 files changed, 193 insertions(+)
diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
new file mode 100644
index 0000000..bf5dd6f
--- /dev/null
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) {
+ import LinuxIoMetricsCollector._
+ var lastUpdateMs = -1L
+ var cachedReadBytes = 0L
+ var cachedWriteBytes = 0L
+ val path = Paths.get(procRoot, "self", "io")
+
+ def readBytes(): Long = this.synchronized {
+ val curMs = time.milliseconds()
+ if (curMs != lastUpdateMs) {
+ updateValues(curMs)
+ }
+ cachedReadBytes
+ }
+
+ def writeBytes(): Long = this.synchronized {
+ val curMs = time.milliseconds()
+ if (curMs != lastUpdateMs) {
+ updateValues(curMs)
+ }
+ cachedWriteBytes
+ }
+
+ /**
+ * Read /proc/self/io.
+ *
+ * Generally, each line in this file contains a prefix followed by a colon and a number.
+ *
+ * For example, it might contain this:
+ * rchar: 4052
+ * wchar: 0
+ * syscr: 13
+ * syscw: 0
+ * read_bytes: 0
+ * write_bytes: 0
+ * cancelled_write_bytes: 0
+ */
+ def updateValues(now: Long): Boolean = this.synchronized {
+ try {
+ cachedReadBytes = -1
+ cachedWriteBytes = -1
+ val lines = Files.readAllLines(path).asScala
+ lines.foreach(line => {
+ if (line.startsWith(READ_BYTES_PREFIX)) {
+ cachedReadBytes = line.substring(READ_BYTES_PREFIX.size).toLong
+ } else if (line.startsWith(WRITE_BYTES_PREFIX)) {
+ cachedWriteBytes = line.substring(WRITE_BYTES_PREFIX.size).toLong
+ }
+ })
+ lastUpdateMs = now
+ true
+ } catch {
+ case t: Throwable => {
+ logger.warn("Unable to update IO metrics", t)
+ false
+ }
+ }
+ }
+
+ def usable(): Boolean = {
+ if (path.toFile().exists()) {
+ updateValues(time.milliseconds())
+ } else {
+ logger.debug(s"disabling IO metrics collection because ${path} does not exist.")
+ false
+ }
+ }
+}
+
+object LinuxIoMetricsCollector {
+ val READ_BYTES_PREFIX = "read_bytes: "
+ val WRITE_BYTES_PREFIX = "write_bytes: "
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f40be9e..05a6257 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -189,6 +189,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
newGauge("ClusterId", () => clusterId)
newGauge("yammer-metrics-count", () => KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+ val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
+
+ if (linuxIoMetricsCollector.usable()) {
+ newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+ newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
+ }
+
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
diff --git a/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala b/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala
new file mode 100644
index 0000000..3c2741a
--- /dev/null
+++ b/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import kafka.utils.{Logging, MockTime}
+import org.apache.kafka.test.TestUtils
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.{Rule, Test}
+import org.junit.rules.Timeout
+
+class LinuxIoMetricsCollectorTest extends Logging {
+ @Rule
+ def globalTimeout = Timeout.millis(120000)
+
+ class TestDirectory() {
+ val baseDir = TestUtils.tempDirectory()
+ val selfDir = Files.createDirectories(baseDir.toPath.resolve("self"))
+
+ def writeProcFile(readBytes: Long, writeBytes: Long) = {
+ val bld = new StringBuilder()
+ bld.append("rchar: 0%n".format())
+ bld.append("wchar: 0%n".format())
+ bld.append("syschr: 0%n".format())
+ bld.append("syscw: 0%n".format())
+ bld.append("read_bytes: %d%n".format(readBytes))
+ bld.append("write_bytes: %d%n".format(writeBytes))
+ bld.append("cancelled_write_bytes: 0%n".format())
+ Files.write(selfDir.resolve("io"), bld.toString().getBytes(StandardCharsets.UTF_8))
+ }
+ }
+
+ @Test
+ def testReadProcFile(): Unit = {
+ val testDirectory = new TestDirectory()
+ val time = new MockTime(100, 1000)
+ testDirectory.writeProcFile(123L, 456L)
+ val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath,
+ time, logger.underlying)
+
+ // Test that we can read the values we wrote.
+ assertTrue(collector.usable())
+ assertEquals(123L, collector.readBytes())
+ assertEquals(456L, collector.writeBytes())
+ testDirectory.writeProcFile(124L, 457L)
+
+ // The previous values should still be cached.
+ assertEquals(123L, collector.readBytes())
+ assertEquals(456L, collector.writeBytes())
+
+ // Update the time, and the values should be re-read.
+ time.sleep(1)
+ assertEquals(124L, collector.readBytes())
+ assertEquals(457L, collector.writeBytes())
+ }
+
+ @Test
+ def testUnableToReadNonexistentProcFile(): Unit = {
+ val testDirectory = new TestDirectory()
+ val time = new MockTime(100, 1000)
+ val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath,
+ time, logger.underlying)
+
+ // Test that we can't read the file, since it hasn't been written.
+ assertFalse(collector.usable())
+ }
+}