You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by de...@apache.org on 2023/10/30 11:11:05 UTC
(kafka) branch trunk updated: MINOR: Rename log dir UUIDs (#14517)
This is an automated email from the ASF dual-hosted git repository.
dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9dbee599f13 MINOR: Rename log dir UUIDs (#14517)
9dbee599f13 is described below
commit 9dbee599f13997effd8f7e278fd7256b850c8813
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Mon Oct 30 11:10:57 2023 +0000
MINOR: Rename log dir UUIDs (#14517)
After a late discussion in the voting thread for KIP-858 we
decided to improve the names for the designated reserved
log directory UUID values.
Reviewers: Christo Lolov <lo...@amazon.com>, Ismael Juma <is...@juma.me.uk>, Ziming Deng <de...@gmail.com>.
---
.../main/java/org/apache/kafka/common/Uuid.java | 41 ++-----------
core/src/main/scala/kafka/log/LogManager.scala | 4 +-
core/src/main/scala/kafka/tools/StorageTool.scala | 4 +-
.../java/org/apache/kafka/common/DirectoryId.java | 70 ++++++++++++++++++++++
4 files changed, 79 insertions(+), 40 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/Uuid.java b/clients/src/main/java/org/apache/kafka/common/Uuid.java
index d8247f8eeee..b39d52eb96f 100644
--- a/clients/src/main/java/org/apache/kafka/common/Uuid.java
+++ b/clients/src/main/java/org/apache/kafka/common/Uuid.java
@@ -46,45 +46,14 @@ public class Uuid implements Comparable<Uuid> {
*/
public static final Uuid ZERO_UUID = new Uuid(0L, 0L);
- /**
- * A UUID that is used to identify new or unknown dir assignments.
- */
- public static final Uuid UNKNOWN_DIR = ZERO_UUID;
-
- /**
- * A UUID that is used to represent unspecified offline dirs.
- */
- public static final Uuid OFFLINE_DIR = ONE_UUID;
-
- /**
- * A UUID that is used to represent and unspecified log directory,
- * that is expected to have been previously selected to host an
- * associated replica. This contrasts with {@code UNKNOWN_DIR},
- * which is associated with (typically new) replicas that may not
- * yet have been placed in any log directory.
- */
- public static final Uuid SELECTED_DIR = new Uuid(0L, 2L);
-
/**
* The set of reserved UUIDs that will never be returned by the randomUuid method.
*/
- public static final Set<Uuid> RESERVED;
-
- static {
- HashSet<Uuid> reserved = new HashSet<>(Arrays.asList(
- METADATA_TOPIC_ID,
- ZERO_UUID,
- ONE_UUID,
- UNKNOWN_DIR,
- OFFLINE_DIR,
- SELECTED_DIR
- ));
- // The first 100 UUIDs are reserved for future use.
- for (long i = 0L; i < 100L; i++) {
- reserved.add(new Uuid(0L, i));
- }
- RESERVED = Collections.unmodifiableSet(reserved);
- }
+ public static final Set<Uuid> RESERVED = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+ METADATA_TOPIC_ID,
+ ZERO_UUID,
+ ONE_UUID
+ )));
private final long mostSignificantBits;
private final long leastSignificantBits;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 10b4ece3e9c..61fe4168cd3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -25,7 +25,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.ConfigRepository
import kafka.server._
import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
@@ -287,7 +287,7 @@ class LogManager(logDirs: Seq[File],
val uuid = rawMetaProperties.directoryId match {
case Some(uuidStr) => Uuid.fromString(uuidStr)
case None =>
- val uuid = Uuid.randomUuid()
+ val uuid = DirectoryId.random()
rawMetaProperties.directoryId = uuid.toString
metadataCheckpoint.write(rawMetaProperties.props)
uuid
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 2aa1e02853e..e6a10e21a51 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -24,7 +24,7 @@ import kafka.utils.{Exit, Logging}
import net.sourceforge.argparse4j.ArgumentParsers
import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
import net.sourceforge.argparse4j.inf.Namespace
-import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.{DirectoryId, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@@ -430,7 +430,7 @@ object StorageTool extends Logging {
}
val metaPropertiesPath = Paths.get(directory, KafkaServer.brokerMetaPropsFile)
val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
- checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))
+ checkpoint.write(metaProperties.toPropertiesWithDirectoryId(DirectoryId.random().toString))
val bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty())
bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
diff --git a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
new file mode 100644
index 00000000000..b9917730f91
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class DirectoryId {
+
+ /**
+ * A UUID that is used to identify new or unknown dir assignments.
+ */
+ public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+
+ /**
+ * A UUID that is used to represent unspecified offline dirs.
+ */
+ public static final Uuid LOST = new Uuid(0L, 1L);
+
+ /**
+ * A UUID that is used to represent and unspecified log directory,
+ * that is expected to have been previously selected to host an
+ * associated replica. This contrasts with {@code UNASSIGNED_DIR},
+ * which is associated with (typically new) replicas that may not
+ * yet have been placed in any log directory.
+ */
+ public static final Uuid MIGRATING = new Uuid(0L, 2L);
+
+ /**
+ * The set of reserved UUIDs that will never be returned by the random method.
+ */
+ public static final Set<Uuid> RESERVED;
+
+ static {
+ HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
+ // The first 100 UUIDs are reserved for future use.
+ for (long i = 0L; i < 100L; i++) {
+ reserved.add(new Uuid(0L, i));
+ }
+ RESERVED = Collections.unmodifiableSet(reserved);
+ }
+
+ /**
+ * Static factory to generate a directory ID.
+ *
+ * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-")
+ */
+ public static Uuid random() {
+ Uuid uuid = Uuid.randomUuid();
+ while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
+ uuid = Uuid.randomUuid();
+ }
+ return uuid;
+ }
+}