You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/08 20:44:31 UTC
[kafka] branch 2.8 updated: MINOR: Add StorageTool as specified in
KIP-631 (#10043)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 576156b MINOR: Add StorageTool as specified in KIP-631 (#10043)
576156b is described below
commit 576156bb3d089aefa10158854c671a96b7c3bd65
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Mon Feb 8 12:42:40 2021 -0800
MINOR: Add StorageTool as specified in KIP-631 (#10043)
Add StorageTool as specified in KIP-631. It can format and describe storage directories. Fix a bug in `ZkMetaProperties#toString`.
Reviewers: David Arthur <mu...@gmail.com>
---
bin/kafka-storage.sh | 17 ++
.../kafka/server/BrokerMetadataCheckpoint.scala | 6 +-
core/src/main/scala/kafka/tools/StorageTool.scala | 238 +++++++++++++++++++++
.../scala/unit/kafka/tools/StorageToolTest.scala | 187 ++++++++++++++++
4 files changed, 445 insertions(+), 3 deletions(-)
diff --git a/bin/kafka-storage.sh b/bin/kafka-storage.sh
new file mode 100755
index 0000000..eef9342
--- /dev/null
+++ b/bin/kafka-storage.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.tools.StorageTool "$@"
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 93a0abe..e3d5c4b 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -88,9 +88,9 @@ class RawMetaProperties(val props: Properties = new Properties()) {
}
override def toString: String = {
- "RawMetaProperties(" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
+ "{" + props.keySet().asScala.toList.asInstanceOf[List[String]].sorted.map {
key => key + "=" + props.get(key)
- }.mkString(", ") + ")"
+ }.mkString(", ") + "}"
}
}
@@ -130,7 +130,7 @@ case class ZkMetaProperties(
}
override def toString: String = {
- s"LegacyMetaProperties(brokerId=$brokerId, clusterId=$clusterId)"
+ s"ZkMetaProperties(brokerId=$brokerId, clusterId=$clusterId)"
}
}
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
new file mode 100644
index 0000000..ff84007
--- /dev/null
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.PrintStream
+import java.nio.file.{Files, Paths}
+
+import kafka.server.{BrokerMetadataCheckpoint, KafkaConfig, MetaProperties, RawMetaProperties}
+import kafka.utils.{Exit, Logging}
+import net.sourceforge.argparse4j.ArgumentParsers
+import net.sourceforge.argparse4j.impl.Arguments.{store, storeTrue}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.utils.Utils
+
+import scala.collection.mutable
+
+object StorageTool extends Logging {
+ def main(args: Array[String]): Unit = {
+ try {
+ val parser = ArgumentParsers.
+ newArgumentParser("kafka-storage").
+ defaultHelp(true).
+ description("The Kafka storage tool.")
+ val subparsers = parser.addSubparsers().dest("command")
+
+ val infoParser = subparsers.addParser("info").
+ help("Get information about the Kafka log directories on this node.")
+ val formatParser = subparsers.addParser("format").
+ help("Format the Kafka log directories on this node.")
+ subparsers.addParser("random-uuid").help("Print a random UUID.")
+ List(infoParser, formatParser).foreach(parser => {
+ parser.addArgument("--config", "-c").
+ action(store()).
+ required(true).
+ help("The Kafka configuration file to use.")
+ })
+ formatParser.addArgument("--cluster-id", "-t").
+ action(store()).
+ required(true).
+ help("The cluster ID to use.")
+ formatParser.addArgument("--ignore-formatted", "-g").
+ action(storeTrue())
+
+ val namespace = parser.parseArgsOrFail(args)
+ val command = namespace.getString("command")
+ val config = Option(namespace.getString("config")).flatMap(
+ p => Some(new KafkaConfig(Utils.loadProps(p))))
+
+ command match {
+ case "info" =>
+ val directories = configToLogDirectories(config.get)
+ val kip500Mode = configToKip500Mode(config.get)
+ Exit.exit(infoCommand(System.out, kip500Mode, directories))
+
+ case "format" =>
+ val directories = configToLogDirectories(config.get)
+ val clusterId = namespace.getString("cluster_id")
+ val metaProperties = buildMetadataProperties(clusterId, config.get)
+ val ignoreFormatted = namespace.getBoolean("ignore_formatted")
+ if (!configToKip500Mode(config.get)) {
+ throw new TerseFailure("The kafka configuration file appears to be for " +
+ "a legacy cluster. Formatting is only supported for kip-500 clusters.")
+ }
+ Exit.exit(formatCommand(System.out, directories, metaProperties, ignoreFormatted ))
+
+ case "random-uuid" =>
+ System.out.println(Uuid.randomUuid)
+ Exit.exit(0)
+
+ case _ =>
+ throw new RuntimeException(s"Unknown command $command")
+ }
+ } catch {
+ case e: TerseFailure =>
+ System.err.println(e.getMessage)
+ System.exit(1)
+ }
+ }
+
+ def configToLogDirectories(config: KafkaConfig): Seq[String] = {
+ val directories = new mutable.TreeSet[String]
+ directories ++= config.logDirs
+ Option(config.metadataLogDir).foreach(directories.add)
+ directories.toSeq
+ }
+
+ def configToKip500Mode(config: KafkaConfig): Boolean = config.processRoles.nonEmpty
+
+ def infoCommand(stream: PrintStream, kip500Mode: Boolean, directories: Seq[String]): Int = {
+ val problems = new mutable.ArrayBuffer[String]
+ val foundDirectories = new mutable.ArrayBuffer[String]
+ var prevMetadata: Option[RawMetaProperties] = None
+ directories.sorted.foreach(directory => {
+ val directoryPath = Paths.get(directory)
+ if (!Files.isDirectory(directoryPath)) {
+ if (!Files.exists(directoryPath)) {
+ problems += s"$directoryPath does not exist"
+ } else {
+ problems += s"$directoryPath is not a directory"
+ }
+ } else {
+ foundDirectories += directoryPath.toString
+ val metaPath = directoryPath.resolve("meta.properties")
+ if (!Files.exists(metaPath)) {
+ problems += s"$directoryPath is not formatted."
+ } else {
+ val properties = Utils.loadProps(metaPath.toString)
+ val rawMetaProperties = new RawMetaProperties(properties)
+
+ val curMetadata = rawMetaProperties.version match {
+ case 0 | 1 => Some(rawMetaProperties)
+ case v =>
+ problems += s"Unsupported version for $metaPath: $v"
+ None
+ }
+
+ if (prevMetadata.isEmpty) {
+ prevMetadata = curMetadata
+ } else {
+ if (!prevMetadata.get.equals(curMetadata.get)) {
+ problems += s"Metadata for $metaPath was ${curMetadata.get}, " +
+ s"but other directories featured ${prevMetadata.get}"
+ }
+ }
+ }
+ }
+ })
+
+ prevMetadata.foreach { prev =>
+ if (kip500Mode) {
+ if (prev.version == 0) {
+ problems += "The kafka configuration file appears to be for a kip-500 cluster, but " +
+ "the directories are formatted for legacy mode."
+ }
+ } else if (prev.version == 1) {
+ problems += "The kafka configuration file appears to be for a legacy cluster, but " +
+ "the directories are formatted for kip-500."
+ }
+ }
+
+ if (directories.isEmpty) {
+ stream.println("No directories specified.")
+ 0
+ } else {
+ if (foundDirectories.nonEmpty) {
+ if (foundDirectories.size == 1) {
+ stream.println("Found log directory:")
+ } else {
+ stream.println("Found log directories:")
+ }
+ foundDirectories.foreach(d => stream.println(" %s".format(d)))
+ stream.println("")
+ }
+
+ prevMetadata.foreach { prev =>
+ stream.println(s"Found metadata: ${prev}")
+ stream.println("")
+ }
+
+ if (problems.nonEmpty) {
+ if (problems.size == 1) {
+ stream.println("Found problem:")
+ } else {
+ stream.println("Found problems:")
+ }
+ problems.foreach(d => stream.println(" %s".format(d)))
+ stream.println("")
+ 1
+ } else {
+ 0
+ }
+ }
+ }
+
+ def buildMetadataProperties(
+ clusterIdStr: String,
+ config: KafkaConfig
+ ): MetaProperties = {
+ val effectiveClusterId = try {
+ Uuid.fromString(clusterIdStr)
+ } catch {
+ case e: Throwable => throw new TerseFailure(s"Cluster ID string $clusterIdStr " +
+ s"does not appear to be a valid UUID: ${e.getMessage}")
+ }
+ require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
+ new MetaProperties(effectiveClusterId, config.nodeId)
+ }
+
+ def formatCommand(stream: PrintStream,
+ directories: Seq[String],
+ metaProperties: MetaProperties,
+ ignoreFormatted: Boolean): Int = {
+ if (directories.isEmpty) {
+ throw new TerseFailure("No log directories found in the configuration.")
+ }
+ val unformattedDirectories = directories.filter(directory => {
+ if (!Files.isDirectory(Paths.get(directory)) || !Files.exists(Paths.get(directory, "meta.properties"))) {
+ true
+ } else if (!ignoreFormatted) {
+ throw new TerseFailure(s"Log directory ${directory} is already formatted. " +
+ "Use --ignore-formatted to ignore this directory and format the others.")
+ } else {
+ false
+ }
+ })
+ if (unformattedDirectories.isEmpty) {
+ stream.println("All of the log directories are already formatted.")
+ }
+ unformattedDirectories.foreach(directory => {
+ try {
+ Files.createDirectories(Paths.get(directory))
+ } catch {
+ case e: Throwable => throw new TerseFailure(s"Unable to create storage " +
+ s"directory ${directory}: ${e.getMessage}")
+ }
+ val metaPropertiesPath = Paths.get(directory, "meta.properties")
+ val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
+ checkpoint.write(metaProperties.toProperties)
+ stream.println(s"Formatting ${directory}")
+ })
+ 0
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
new file mode 100644
index 0000000..d601e36
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util
+import java.util.Properties
+
+import kafka.server.{KafkaConfig, MetaProperties}
+import kafka.utils.TestUtils
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.{Test, Timeout}
+
+
+@Timeout(value = 40)
+class StorageToolTest {
+ private def newKip500Properties() = {
+ val properties = new Properties()
+ properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
+ properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ properties.setProperty(KafkaConfig.NodeIdProp, "2")
+ properties
+ }
+
+ @Test
+ def testConfigToLogDirectories(): Unit = {
+ val config = new KafkaConfig(newKip500Properties())
+ assertEquals(Seq("/tmp/bar", "/tmp/foo"), StorageTool.configToLogDirectories(config))
+ }
+
+ @Test
+ def testConfigToLogDirectoriesWithMetaLogDir(): Unit = {
+ val properties = newKip500Properties()
+ properties.setProperty(KafkaConfig.MetadataLogDirProp, "/tmp/baz")
+ val config = new KafkaConfig(properties)
+ assertEquals(Seq("/tmp/bar", "/tmp/baz", "/tmp/foo"),
+ StorageTool.configToLogDirectories(config))
+ }
+
+ @Test
+ def testInfoCommandOnEmptyDirectory(): Unit = {
+ val stream = new ByteArrayOutputStream()
+ val tempDir = TestUtils.tempDir()
+ try {
+ assertEquals(1, StorageTool.
+ infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+ assertEquals(s"""Found log directory:
+ ${tempDir.toString}
+
+Found problem:
+ ${tempDir.toString} is not formatted.
+
+""", stream.toString())
+ } finally Utils.delete(tempDir)
+ }
+
+ @Test
+ def testInfoCommandOnMissingDirectory(): Unit = {
+ val stream = new ByteArrayOutputStream()
+ val tempDir = TestUtils.tempDir()
+ tempDir.delete()
+ try {
+ assertEquals(1, StorageTool.
+ infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+ assertEquals(s"""Found problem:
+ ${tempDir.toString} does not exist
+
+""", stream.toString())
+ } finally Utils.delete(tempDir)
+ }
+
+ @Test
+ def testInfoCommandOnDirectoryAsFile(): Unit = {
+ val stream = new ByteArrayOutputStream()
+ val tempFile = TestUtils.tempFile()
+ try {
+ assertEquals(1, StorageTool.
+ infoCommand(new PrintStream(stream), true, Seq(tempFile.toString)))
+ assertEquals(s"""Found problem:
+ ${tempFile.toString} is not a directory
+
+""", stream.toString())
+ } finally tempFile.delete()
+ }
+
+ @Test
+ def testInfoWithMismatchedLegacyKafkaConfig(): Unit = {
+ val stream = new ByteArrayOutputStream()
+ val tempDir = TestUtils.tempDir()
+ try {
+ Files.write(tempDir.toPath.resolve("meta.properties"),
+ String.join("\n", util.Arrays.asList(
+ "version=1",
+ "cluster.id=XcZZOzUqS4yHOjhMQB6JLQ")).
+ getBytes(StandardCharsets.UTF_8))
+ assertEquals(1, StorageTool.
+ infoCommand(new PrintStream(stream), false, Seq(tempDir.toString)))
+ assertEquals(s"""Found log directory:
+ ${tempDir.toString}
+
+Found metadata: {cluster.id=XcZZOzUqS4yHOjhMQB6JLQ, version=1}
+
+Found problem:
+ The kafka configuration file appears to be for a legacy cluster, but the directories are formatted for kip-500.
+
+""", stream.toString())
+ } finally Utils.delete(tempDir)
+ }
+
+ @Test
+ def testInfoWithMismatchedKip500KafkaConfig(): Unit = {
+ val stream = new ByteArrayOutputStream()
+ val tempDir = TestUtils.tempDir()
+ try {
+ Files.write(tempDir.toPath.resolve("meta.properties"),
+ String.join("\n", util.Arrays.asList(
+ "version=0",
+ "broker.id=1",
+ "cluster.id=26c36907-4158-4a35-919d-6534229f5241")).
+ getBytes(StandardCharsets.UTF_8))
+ assertEquals(1, StorageTool.
+ infoCommand(new PrintStream(stream), true, Seq(tempDir.toString)))
+ assertEquals(s"""Found log directory:
+ ${tempDir.toString}
+
+Found metadata: {broker.id=1, cluster.id=26c36907-4158-4a35-919d-6534229f5241, version=0}
+
+Found problem:
+ The kafka configuration file appears to be for a kip-500 cluster, but the directories are formatted for legacy mode.
+
+""", stream.toString())
+ } finally Utils.delete(tempDir)
+ }
+
+ @Test
+ def testFormatEmptyDirectory(): Unit = {
+ val tempDir = TestUtils.tempDir()
+ try {
+ val metaProperties = MetaProperties(
+ clusterId = Uuid.fromString("XcZZOzUqS4yHOjhMQB6JLQ"), nodeId = 2)
+ val stream = new ByteArrayOutputStream()
+ assertEquals(0, StorageTool.
+ formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
+ assertEquals("Formatting %s%n".format(tempDir), stream.toString())
+
+ try assertEquals(1, StorageTool.
+ formatCommand(new PrintStream(new ByteArrayOutputStream()), Seq(tempDir.toString), metaProperties, false)) catch {
+ case e: TerseFailure => assertEquals(s"Log directory ${tempDir} is already " +
+ "formatted. Use --ignore-formatted to ignore this directory and format the " +
+ "others.", e.getMessage)
+ }
+
+ val stream2 = new ByteArrayOutputStream()
+ assertEquals(0, StorageTool.
+ formatCommand(new PrintStream(stream2), Seq(tempDir.toString), metaProperties, true))
+ assertEquals("All of the log directories are already formatted.%n".format(), stream2.toString())
+ } finally Utils.delete(tempDir)
+ }
+
+ @Test
+ def testFormatWithInvalidClusterId(): Unit = {
+ val config = new KafkaConfig(newKip500Properties())
+ assertEquals("Cluster ID string invalid does not appear to be a valid UUID: " +
+ "Input string `invalid` decoded as 5 bytes, which is not equal to the expected " +
+ "16 bytes of a base64-encoded UUID", assertThrows(classOf[TerseFailure],
+ () => StorageTool.buildMetadataProperties("invalid", config)).getMessage)
+ }
+}