You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/08 20:44:31 UTC

[kafka] branch 2.8 updated: MINOR: Add StorageTool as specified in KIP-631 (#10043)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 576156b  MINOR: Add StorageTool as specified in KIP-631 (#10043)
576156b is described below

commit 576156bb3d089aefa10158854c671a96b7c3bd65
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon Feb 8 12:42:40 2021 -0800

    MINOR: Add StorageTool as specified in KIP-631 (#10043)
    
    Add StorageTool as specified in KIP-631. It can format and describe storage directories.  Fix a bug in `ZkMetaProperties#toString`.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 bin/kafka-storage.sh                               |  17 ++
 .../kafka/server/BrokerMetadataCheckpoint.scala    |   6 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  | 238 +++++++++++++++++++++
 .../scala/unit/kafka/tools/StorageToolTest.scala   | 187 ++++++++++++++++
 4 files changed, 445 insertions(+), 3 deletions(-)

diff --git a/bin/kafka-storage.sh b/bin/kafka-storage.sh
new file mode 100755
index 0000000..eef9342
--- /dev/null
+++ b/bin/kafka-storage.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.StorageTool "$@"
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 93a0abe..e3d5c4b 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new Properties()) {
   }
 
   override def toString: String = {
-    "RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
+    "{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
       key => key + "=" + props.get(key)
-    }.mkString(", ") + ")"
+    }.mkString(", ") + "}"
   }
 }
 
@@ -130,7 +130,7 @@ case class ZkMetaProperties(
   }
 
   override def toString: String = {
-    s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)"
+    s"ZkMetaProperties(brokerId=$brokerId, clusterId=$clusterId)"
   }
 }
 
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
new file mode 100644
index 0000000..ff84007
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.PrintStream
+import java.nio.file.{Files, Paths}
+
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
+import kafka.utils.{Exit, Logging}
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable
+
+object StorageTool extends Logging {
+  def main(args: Array[String]): Unit = {
+    try {
+      val parser = ArgumentParsers.
+        newArgumentParser("kafka-storage").
+        defaultHelp(true).
+        description("The Kafka storage tool.")
+      val subparsers = parser.addSubparsers().dest("command")
+
+      val infoParser = subparsers.addParser("info").
+        help("Get information about the Kafka log directories on this node.")
+      val formatParser = subparsers.addParser("format").
+        help("Format the Kafka log directories on this node.")
+      subparsers.addParser("random-uuid").help("Print a random UUID.")
+      List(infoParser, formatParser).foreach(parser => {
+        parser.addArgument("--config", "-c").
+          action(store()).
+          required(true).
+          help("The Kafka configuration file to use.")
+      })
+      formatParser.addArgument("--cluster-id", "-t").
+        action(store()).
+        required(true).
+        help("The cluster ID to use.")
+      formatParser.addArgument("--ignore-formatted", "-g").
+        action(storeTrue())
+
+      val namespace = parser.parseArgsOrFail(args)
+      val command = namespace.getString("command")
+      val config = Option(namespace.getString("config")).flatMap(
+        p => Some(new KafkaConfig(Utils.loadProps(p))))
+
+      command match {
+        case "info" =>
+          val directories = configToLogDirectories(config.get)
+          val kip500Mode = configToKip500Mode(config.get)
+          Exit.exit(infoCommand(System.out, kip500Mode, directories))
+
+        case "format" =>
+          val directories = configToLogDirectories(config.get)
+          val clusterId = namespace.getString("cluster_id")
+          val metaProperties = buildMetadataProperties(clusterId, config.get)
+          val ignoreFormatted = namespace.getBoolean("ignore_formatted")
+          if (!configToKip500Mode(config.get)) {
+            throw new TerseFailure("The kafka configuration file appears to be for " +
+              "a legacy cluster. Formatting is only supported for kip-500 clusters.")
+          }
+          Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted ))
+
+        case "random-uuid" =>
+          System.out.println(Uuid.randomUuid)
+          Exit.exit(0)
+
+        case _ =>
+          throw new RuntimeException(s"Unknown command $command")
+      }
+    } catch {
+      case e: TerseFailure =>
+        System.err.println(e.getMessage)
+        System.exit(1)
+    }
+  }
+
+  def configToLogDirectories(config: KafkaConfig): Seq[String] = {
+    val directories = new mutable.TreeSet[String]
+    directories ++= config.logDirs
+    Option(config.metadataLogDir).foreach(directories.add)
+    directories.toSeq
+  }
+
+  def configToKip500Mode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
+
+  def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: Seq[String]): Int = {
+    val problems = new mutable.ArrayBuffer[String]
+    val foundDirectories = new mutable.ArrayBuffer[String]
+    var prevMetadata: Option[RawMetaProperties] = None
+    directories.sorted.foreach(directory => {
+      val directoryPath = Paths.get(directory)
+      if (!Files.isDirectory(directoryPath)) {
+        if (!Files.exists(directoryPath)) {
+          problems += s"$directoryPath does not exist"
+        } else {
+          problems += s"$directoryPath is not a directory"
+        }
+      } else {
+        foundDirectories += directoryPath.toString
+        val metaPath = directoryPath.resolve("meta.properties")
+        if (!Files.exists(metaPath)) {
+          problems += s"$directoryPath is not formatted."
+        } else {
+          val properties = Utils.loadProps(metaPath.toString)
+          val rawMetaProperties = new RawMetaProperties(properties)
+
+          val curMetadata = rawMetaProperties.version match {
+            case 0 | 1 => Some(rawMetaProperties)
+            case v =>
+              problems += s"Unsupported version for $metaPath: $v"
+              None
+          }
+
+          if (prevMetadata.isEmpty) {
+            prevMetadata = curMetadata
+          } else {
+            if (!prevMetadata.get.equals(curMetadata.get)) {
+              problems += s"Metadata for $metaPath was ${curMetadata.get}, " +
+                s"but other directories featured ${prevMetadata.get}"
+            }
+          }
+        }
+      }
+    })
+
+    prevMetadata.foreach { prev =>
+      if (kip500Mode) {
+        if (prev.version == 0) {
+          problems += "The kafka configuration file appears to be for a kip-500 cluster, but " +
+            "the directories are formatted for legacy mode."
+        }
+      } else if (prev.version == 1) {
+        problems += "The kafka configuration file appears to be for a legacy cluster, but " +
+          "the directories are formatted for kip-500."
+      }
+    }
+
+    if (directories.isEmpty) {
+      stream.println("No directories specified.")
+      0
+    } else {
+      if (foundDirectories.nonEmpty) {
+        if (foundDirectories.size == 1) {
+          stream.println("Found log directory:")
+        } else {
+          stream.println("Found log directories:")
+        }
+        foundDirectories.foreach(d => stream.println("  %s".format(d)))
+        stream.println("")
+      }
+
+      prevMetadata.foreach { prev =>
+        stream.println(s"Found metadata: ${prev}")
+        stream.println("")
+      }
+
+      if (problems.nonEmpty) {
+        if (problems.size == 1) {
+          stream.println("Found problem:")
+        } else {
+          stream.println("Found problems:")
+        }
+        problems.foreach(d => stream.println("  %s".format(d)))
+        stream.println("")
+        1
+      } else {
+        0
+      }
+    }
+  }
+
+  def buildMetadataProperties(
+    clusterIdStr: String,
+    config: KafkaConfig
+  ): MetaProperties = {
+    val effectiveClusterId = try {
+      Uuid.fromString(clusterIdStr)
+    } catch {
+      case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " +
+        s"does not appear to be a valid UUID: ${e.getMessage}")
+    }
+    require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
+    new MetaProperties(effectiveClusterId, config.nodeId)
+  }
+
+  def formatCommand(stream: PrintStream,
+                    directories: Seq[String],
+                    metaProperties: MetaProperties,
+                    ignoreFormatted: Boolean): Int = {
+    if (directories.isEmpty) {
+      throw new TerseFailure("No log directories found in the configuration.")
+    }
+    val unformattedDirectories = directories.filter(directory => {
+      if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) {
+          true
+      } else if (!ignoreFormatted) {
+        throw new TerseFailure(s"Log directory ${directory} is already formatted. " +
+          "Use --ignore-formatted to ignore this directory and format the others.")
+      } else {
+        false
+      }
+    })
+    if (unformattedDirectories.isEmpty) {
+      stream.println("All of the log directories are already formatted.")
+    }
+    unformattedDirectories.foreach(directory => {
+      try {
+        Files.createDirectories(Paths.get(directory))
+      } catch {
+        case e: Throwable => throw new TerseFailure(s"Unable to create storage " +
+          s"directory ${directory}: ${e.getMessage}")
+      }
+      val metaPropertiesPath = Paths.get(directory, "meta.properties")
+      val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
+      checkpoint.write(metaProperties.toProperties)
+      stream.println(s"Formatting ${directory}")
+    })
+    0
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
new file mode 100644
index 0000000..d601e36
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util
+import java.util.Properties
+
+import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.{Test, Timeout}
+
+
+@Timeout(value = 40)
+class StorageToolTest {
+  private def newKip500Properties() = {
+    val properties = new Properties()
+    properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
+    properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+    properties.setProperty(KafkaConfig.NodeIdProp, "2")
+    properties
+  }
+
+  @Test
+  def testConfigToLogDirectories(): Unit = {
+    val config = new KafkaConfig(newKip500Properties())
+    assertEquals(Seq("/tmp/bar", "/tmp/foo"), StorageTool.configToLogDirectories(config))
+  }
+
+  @Test
+  def testConfigToLogDirectoriesWithMetaLogDir(): Unit = {
+    val properties = newKip500Properties()
+    properties.setProperty(KafkaConfig.MetadataLogDirProp, "/tmp/baz")
+    val config = new KafkaConfig(properties)
+    assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"),
+      StorageTool.configToLogDirectories(config))
+  }
+
+  @Test
+  def testInfoCommandOnEmptyDirectory(): Unit = {
+    val stream = new ByteArrayOutputStream()
+    val tempDir = TestUtils.tempDir()
+    try {
+      assertEquals(1, StorageTool.
+        infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+      assertEquals(s"""Found log directory:
+  ${tempDir.toString}
+
+Found problem:
+  ${tempDir.toString} is not formatted.
+
+""", stream.toString())
+    } finally Utils.delete(tempDir)
+  }
+
+  @Test
+  def testInfoCommandOnMissingDirectory(): Unit = {
+    val stream = new ByteArrayOutputStream()
+    val tempDir = TestUtils.tempDir()
+    tempDir.delete()
+    try {
+      assertEquals(1, StorageTool.
+        infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+      assertEquals(s"""Found problem:
+  ${tempDir.toString} does not exist
+
+""", stream.toString())
+    } finally Utils.delete(tempDir)
+  }
+
+  @Test
+  def testInfoCommandOnDirectoryAsFile(): Unit = {
+    val stream = new ByteArrayOutputStream()
+    val tempFile = TestUtils.tempFile()
+    try {
+      assertEquals(1, StorageTool.
+        infoCommand(new PrintStream(stream), true, Seq(tempFile.toString)))
+      assertEquals(s"""Found problem:
+  ${tempFile.toString} is not a directory
+
+""", stream.toString())
+    } finally tempFile.delete()
+  }
+
+  @Test
+  def testInfoWithMismatchedLegacyKafkaConfig(): Unit = {
+    val stream = new ByteArrayOutputStream()
+    val tempDir = TestUtils.tempDir()
+    try {
+      Files.write(tempDir.toPath.resolve("meta.properties"),
+        String.join("\n", util.Arrays.asList(
+          "version=1",
+          "cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
+            getBytes(StandardCharsets.UTF_8))
+      assertEquals(1, StorageTool.
+        infoCommand(new PrintStream(stream), false, Seq(tempDir.toString)))
+      assertEquals(s"""Found log directory:
+  ${tempDir.toString}
+
+Found metadata: {cluster.id=XcZZOzUqS4yHOjhMQB6JLQ, version=1}
+
+Found problem:
+  The kafka configuration file appears to be for a legacy cluster, but the directories are formatted for kip-500.
+
+""", stream.toString())
+    } finally Utils.delete(tempDir)
+  }
+
+  @Test
+  def testInfoWithMismatchedKip500KafkaConfig(): Unit = {
+    val stream = new ByteArrayOutputStream()
+    val tempDir = TestUtils.tempDir()
+    try {
+      Files.write(tempDir.toPath.resolve("meta.properties"),
+        String.join("\n", util.Arrays.asList(
+          "version=0",
+          "broker.id=1",
+          "cluster.id=26c36907-4158-4a35-919d-6534229f5241")).
+          getBytes(StandardCharsets.UTF_8))
+      assertEquals(1, StorageTool.
+        infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+      assertEquals(s"""Found log directory:
+  ${tempDir.toString}
+
+Found metadata: {broker.id=1, cluster.id=26c36907-4158-4a35-919d-6534229f5241, version=0}
+
+Found problem:
+  The kafka configuration file appears to be for a kip-500 cluster, but the directories are formatted for legacy mode.
+
+""", stream.toString())
+    } finally Utils.delete(tempDir)
+  }
+
+  @Test
+  def testFormatEmptyDirectory(): Unit = {
+    val tempDir = TestUtils.tempDir()
+    try {
+      val metaProperties = MetaProperties(
+        clusterId = Uuid.fromString("XcZZOzUqS4yHOjhMQB6JLQ"), nodeId = 2)
+      val stream = new ByteArrayOutputStream()
+      assertEquals(0, StorageTool.
+        formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
+      assertEquals("Formatting %s%n".format(tempDir), stream.toString())
+
+      try assertEquals(1, StorageTool.
+        formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch {
+        case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " +
+          "formatted. Use --ignore-formatted to ignore this directory and format the " +
+          "others.", e.getMessage)
+      }
+
+      val stream2 = new ByteArrayOutputStream()
+      assertEquals(0, StorageTool.
+        formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true))
+      assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString())
+    } finally Utils.delete(tempDir)
+  }
+
+  @Test
+  def testFormatWithInvalidClusterId(): Unit = {
+    val config = new KafkaConfig(newKip500Properties())
+    assertEquals("Cluster ID string invalid does not appear to be a valid UUID: " +
+      "Input string `invalid` decoded as 5 bytes, which is not equal to the expected " +
+        "16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure],
+          () => StorageTool.buildMetadataProperties("invalid", config)).getMessage)
+  }
+}