You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by de...@apache.org on 2023/10/30 11:11:05 UTC

(kafka) branch trunk updated: MINOR: Rename log dir UUIDs (#14517)

This is an automated email from the ASF dual-hosted git repository.

dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9dbee599f13 MINOR: Rename log dir UUIDs (#14517)
9dbee599f13 is described below

commit 9dbee599f13997effd8f7e278fd7256b850c8813
Author: Igor Soarez <i...@soarez.me>
AuthorDate: Mon Oct 30 11:10:57 2023 +0000

    MINOR: Rename log dir UUIDs (#14517)
    
    After a late discussion in the voting thread for KIP-858 we
    decided to improve the names for the designated reserved
    log directory UUID values.
    
    Reviewers: Christo Lolov <lo...@amazon.com>, Ismael Juma <is...@juma.me.uk>,  Ziming Deng <de...@gmail.com>.
---
 .../main/java/org/apache/kafka/common/Uuid.java    | 41 ++-----------
 core/src/main/scala/kafka/log/LogManager.scala     |  4 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |  4 +-
 .../java/org/apache/kafka/common/DirectoryId.java  | 70 ++++++++++++++++++++++
 4 files changed, 79 insertions(+), 40 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/Uuid.java b/clients/src/main/java/org/apache/kafka/common/Uuid.java
index d8247f8eeee..b39d52eb96f 100644
--- a/clients/src/main/java/org/apache/kafka/common/Uuid.java
+++ b/clients/src/main/java/org/apache/kafka/common/Uuid.java
@@ -46,45 +46,14 @@ public class Uuid implements Comparable<Uuid> {
      */
     public static final Uuid ZERO_UUID = new Uuid(0L, 0L);
 
-    /**
-     * A UUID that is used to identify new or unknown dir assignments.
-     */
-    public static final Uuid UNKNOWN_DIR = ZERO_UUID;
-
-    /**
-     * A UUID that is used to represent unspecified offline dirs.
-     */
-    public static final Uuid OFFLINE_DIR = ONE_UUID;
-
-    /**
-     * A UUID that is used to represent and unspecified log directory,
-     * that is expected to have been previously selected to host an
-     * associated replica. This contrasts with {@code UNKNOWN_DIR},
-     * which is associated with (typically new) replicas that may not
-     * yet have been placed in any log directory.
-     */
-    public static final Uuid SELECTED_DIR = new Uuid(0L, 2L);
-
     /**
      * The set of reserved UUIDs that will never be returned by the randomUuid method.
      */
-    public static final Set<Uuid> RESERVED;
-
-    static {
-        HashSet<Uuid> reserved = new HashSet<>(Arrays.asList(
-                METADATA_TOPIC_ID,
-                ZERO_UUID,
-                ONE_UUID,
-                UNKNOWN_DIR,
-                OFFLINE_DIR,
-                SELECTED_DIR
-        ));
-        // The first 100 UUIDs are reserved for future use.
-        for (long i = 0L; i < 100L; i++) {
-            reserved.add(new Uuid(0L, i));
-        }
-        RESERVED = Collections.unmodifiableSet(reserved);
-    }
+    public static final Set<Uuid> RESERVED = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            METADATA_TOPIC_ID,
+            ZERO_UUID,
+            ONE_UUID
+    )));
 
     private final long mostSignificantBits;
     private final long leastSignificantBits;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 10b4ece3e9c..61fe4168cd3 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -25,7 +25,7 @@ import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.ConfigRepository
 import kafka.server._
 import kafka.utils._
-import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
+import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.common.utils.{KafkaThread, Time, Utils}
 import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
 
@@ -287,7 +287,7 @@ class LogManager(logDirs: Seq[File],
           val uuid = rawMetaProperties.directoryId match {
             case Some(uuidStr) => Uuid.fromString(uuidStr)
             case None =>
-              val uuid = Uuid.randomUuid()
+              val uuid = DirectoryId.random()
               rawMetaProperties.directoryId = uuid.toString
               metadataCheckpoint.write(rawMetaProperties.props)
               uuid
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 2aa1e02853e..e6a10e21a51 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -24,7 +24,7 @@ import kafka.utils.{Exit, Logging}
 import net.sourceforge.argparse4j.ArgumentParsers
 import net.sourceforge.argparse4j.impl.Arguments.{append, store, storeTrue}
 import net.sourceforge.argparse4j.inf.Namespace
-import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.{DirectoryId, Uuid}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
@@ -430,7 +430,7 @@ object StorageTool extends Logging {
       }
       val metaPropertiesPath = Paths.get(directory, KafkaServer.brokerMetaPropsFile)
       val checkpoint = new BrokerMetadataCheckpoint(metaPropertiesPath.toFile)
-      checkpoint.write(metaProperties.toPropertiesWithDirectoryId(Uuid.randomUuid().toString))
+      checkpoint.write(metaProperties.toPropertiesWithDirectoryId(DirectoryId.random().toString))
 
       val bootstrapDirectory = new BootstrapDirectory(directory, Optional.empty())
       bootstrapDirectory.writeBinaryFile(bootstrapMetadata)
diff --git a/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
new file mode 100644
index 00000000000..b9917730f91
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/common/DirectoryId.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class DirectoryId {
+
+    /**
+     * A UUID that is used to identify new or unknown dir assignments.
+     */
+    public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+
+    /**
+     * A UUID that is used to represent unspecified offline dirs.
+     */
+    public static final Uuid LOST = new Uuid(0L, 1L);
+
+    /**
+     * A UUID that is used to represent and unspecified log directory,
+     * that is expected to have been previously selected to host an
+     * associated replica. This contrasts with {@code UNASSIGNED_DIR},
+     * which is associated with (typically new) replicas that may not
+     * yet have been placed in any log directory.
+     */
+    public static final Uuid MIGRATING = new Uuid(0L, 2L);
+
+    /**
+     * The set of reserved UUIDs that will never be returned by the random method.
+     */
+    public static final Set<Uuid> RESERVED;
+
+    static {
+        HashSet<Uuid> reserved = new HashSet<>(Uuid.RESERVED);
+        // The first 100 UUIDs are reserved for future use.
+        for (long i = 0L; i < 100L; i++) {
+            reserved.add(new Uuid(0L, i));
+        }
+        RESERVED = Collections.unmodifiableSet(reserved);
+    }
+
+    /**
+     * Static factory to generate a directory ID.
+     *
+     * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-")
+     */
+    public static Uuid random() {
+        Uuid uuid = Uuid.randomUuid();
+        while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
+            uuid = Uuid.randomUuid();
+        }
+        return uuid;
+    }
+}