You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/05/18 21:19:04 UTC

[kafka] branch trunk updated: KIP-551: Expose disk read and write metrics (#8569)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f2ff73  KIP-551: Expose disk read and write metrics (#8569)
1f2ff73 is described below

commit 1f2ff73b28ddb538c4fda263c31b56368787970d
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon May 18 14:18:16 2020 -0700

    KIP-551: Expose disk read and write metrics (#8569)
    
    Reviewers: David Arthur <mu...@gmail.com>, Mickael Maison <mi...@gmail.com>
---
 .../kafka/metrics/LinuxIoMetricsCollector.scala    | 102 +++++++++++++++++++++
 core/src/main/scala/kafka/server/KafkaServer.scala |   7 ++
 .../metrics/LinuxIoMetricsCollectorTest.scala      |  84 +++++++++++++++++
 3 files changed, 193 insertions(+)

diff --git a/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
new file mode 100644
index 0000000..bf5dd6f
--- /dev/null
+++ b/core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.nio.file.{Files, Paths}
+
+import org.apache.kafka.common.utils.Time
+import org.slf4j.Logger
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Retrieves Linux /proc/self/io metrics.
+ */
+class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) {
+  import LinuxIoMetricsCollector._
+  var lastUpdateMs = -1L
+  var cachedReadBytes = 0L
+  var cachedWriteBytes = 0L
+  val path = Paths.get(procRoot, "self", "io")
+
+  def readBytes(): Long = this.synchronized {
+    val curMs = time.milliseconds()
+    if (curMs != lastUpdateMs) {
+      updateValues(curMs)
+    }
+    cachedReadBytes
+  }
+
+  def writeBytes(): Long = this.synchronized {
+    val curMs = time.milliseconds()
+    if (curMs != lastUpdateMs) {
+      updateValues(curMs)
+    }
+    cachedWriteBytes
+  }
+
+  /**
+   * Read /proc/self/io.
+   *
+   * Generally, each line in this file contains a prefix followed by a colon and a number.
+   *
+   * For example, it might contain this:
+   * rchar: 4052
+   * wchar: 0
+   * syscr: 13
+   * syscw: 0
+   * read_bytes: 0
+   * write_bytes: 0
+   * cancelled_write_bytes: 0
+   */
+  def updateValues(now: Long): Boolean = this.synchronized {
+    try {
+      cachedReadBytes = -1
+      cachedWriteBytes = -1
+      val lines = Files.readAllLines(path).asScala
+      lines.foreach(line => {
+        if (line.startsWith(READ_BYTES_PREFIX)) {
+          cachedReadBytes = line.substring(READ_BYTES_PREFIX.size).toLong
+        } else if (line.startsWith(WRITE_BYTES_PREFIX)) {
+          cachedWriteBytes = line.substring(WRITE_BYTES_PREFIX.size).toLong
+        }
+      })
+      lastUpdateMs = now
+      true
+    } catch {
+      case t: Throwable => {
+        logger.warn("Unable to update IO metrics", t)
+        false
+      }
+    }
+  }
+
+  def usable(): Boolean = {
+    if (path.toFile().exists()) {
+      updateValues(time.milliseconds())
+    } else {
+      logger.debug(s"disabling IO metrics collection because ${path} does not exist.")
+      false
+    }
+  }
+}
+
+object LinuxIoMetricsCollector {
+  val READ_BYTES_PREFIX = "read_bytes: "
+  val WRITE_BYTES_PREFIX = "write_bytes: "
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f40be9e..05a6257 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -189,6 +189,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   newGauge("ClusterId", () => clusterId)
   newGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
 
+  val linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
+
+  if (linuxIoMetricsCollector.usable()) {
+    newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+    newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
+  }
+
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
diff --git a/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala b/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala
new file mode 100644
index 0000000..3c2741a
--- /dev/null
+++ b/core/src/test/scala/kafka/metrics/LinuxIoMetricsCollectorTest.scala
@@ -0,0 +1,84 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+
+import kafka.utils.{Logging, MockTime}
+import org.apache.kafka.test.TestUtils
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.{Rule, Test}
+import org.junit.rules.Timeout
+
+class LinuxIoMetricsCollectorTest extends Logging {
+  @Rule
+  def globalTimeout = Timeout.millis(120000)
+
+  class TestDirectory() {
+    val baseDir = TestUtils.tempDirectory()
+    val selfDir = Files.createDirectories(baseDir.toPath.resolve("self"))
+
+    def writeProcFile(readBytes: Long, writeBytes: Long) = {
+      val bld = new StringBuilder()
+      bld.append("rchar: 0%n".format())
+      bld.append("wchar: 0%n".format())
+      bld.append("syschr: 0%n".format())
+      bld.append("syscw: 0%n".format())
+      bld.append("read_bytes: %d%n".format(readBytes))
+      bld.append("write_bytes: %d%n".format(writeBytes))
+      bld.append("cancelled_write_bytes: 0%n".format())
+      Files.write(selfDir.resolve("io"), bld.toString().getBytes(StandardCharsets.UTF_8))
+    }
+  }
+
+  @Test
+  def testReadProcFile(): Unit = {
+    val testDirectory = new TestDirectory()
+    val time = new MockTime(100, 1000)
+    testDirectory.writeProcFile(123L, 456L)
+    val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath,
+      time, logger.underlying)
+
+    // Test that we can read the values we wrote.
+    assertTrue(collector.usable())
+    assertEquals(123L, collector.readBytes())
+    assertEquals(456L, collector.writeBytes())
+    testDirectory.writeProcFile(124L, 457L)
+
+    // The previous values should still be cached.
+    assertEquals(123L, collector.readBytes())
+    assertEquals(456L, collector.writeBytes())
+
+    // Update the time, and the values should be re-read.
+    time.sleep(1)
+    assertEquals(124L, collector.readBytes())
+    assertEquals(457L, collector.writeBytes())
+  }
+
+  @Test
+  def testUnableToReadNonexistentProcFile(): Unit = {
+    val testDirectory = new TestDirectory()
+    val time = new MockTime(100, 1000)
+    val collector = new LinuxIoMetricsCollector(testDirectory.baseDir.getAbsolutePath,
+      time, logger.underlying)
+
+    // Test that we can't read the file, since it hasn't been written.
+    assertFalse(collector.usable())
+  }
+}