You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/03/19 18:27:08 UTC

[kafka] branch 2.8 updated: MINOR: The new KIP-500 code should treat cluster ID as a string (#10357)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 85f35bf  MINOR: The new KIP-500 code should treat cluster ID as a string (#10357)
85f35bf is described below

commit 85f35bf877716bb88a7750b340dacafb9f6b28f9
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Mar 19 12:56:04 2021 -0400

    MINOR: The new KIP-500 code should treat cluster ID as a string (#10357)
    
    Cluster ID has traditionally been treated as a string by the Kafka protocol (for example,
    AdminClient returns it as a string).  The new KIP-500 code should continue this practice.  If
    we don't do this, upgrading existing clusters may be harder to do.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
 .../common/message/BrokerRegistrationRequest.json  |  2 +-
 .../kafka/common/requests/RequestResponseTest.java |  2 +-
 .../kafka/server/BrokerLifecycleManager.scala      |  6 +--
 .../kafka/server/BrokerMetadataCheckpoint.scala    | 17 ++-------
 core/src/main/scala/kafka/tools/StorageTool.scala  |  2 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |  2 +-
 .../server/BrokerMetadataCheckpointTest.scala      | 44 +++++++++++-----------
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  4 +-
 .../unit/kafka/server/ControllerApisTest.scala     |  2 +-
 .../unit/kafka/server/KafkaRaftServerTest.scala    | 16 ++++----
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  3 +-
 .../kafka/controller/QuorumControllerTest.java     |  2 +-
 12 files changed, 44 insertions(+), 58 deletions(-)

diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 3e27cf1..e0ad06a 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -23,7 +23,7 @@
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+",
       "about": "The broker ID." },
-    { "name": "ClusterId", "type": "uuid", "versions": "0+",
+    { "name": "ClusterId", "type": "string", "versions": "0+",
       "about": "The cluster id of the broker process." },
     { "name": "IncarnationId", "type": "uuid", "versions": "0+",
       "about": "The incarnation id of the broker process." },
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 920a951..de88025 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2672,7 +2672,7 @@ public class RequestResponseTest {
     private BrokerRegistrationRequest createBrokerRegistrationRequest(short v) {
         BrokerRegistrationRequestData data = new BrokerRegistrationRequestData()
                 .setBrokerId(1)
-                .setClusterId(Uuid.randomUuid())
+                .setClusterId(Uuid.randomUuid().toString())
                 .setRack("1")
                 .setFeatures(new BrokerRegistrationRequestData.FeatureCollection(singletonList(
                         new BrokerRegistrationRequestData.Feature()).iterator()))
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index b15fc1c..7569fc24 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -148,7 +148,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
    * The cluster ID, or null if this manager has not been started yet.  This variable can
    * only be read or written from the event queue thread.
    */
-  private var _clusterId: Uuid = _
+  private var _clusterId: String = _
 
   /**
    * The listeners which this broker advertises.  This variable can only be read or
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
    */
   def start(highestMetadataOffsetProvider: () => Long,
             channelManager: BrokerToControllerChannelManager,
-            clusterId: Uuid,
+            clusterId: String,
             advertisedListeners: ListenerCollection,
             supportedFeatures: util.Map[String, VersionRange]): Unit = {
     eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
@@ -245,7 +245,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
 
   private class StartupEvent(highestMetadataOffsetProvider: () => Long,
                      channelManager: BrokerToControllerChannelManager,
-                     clusterId: Uuid,
+                     clusterId: String,
                      advertisedListeners: ListenerCollection,
                      supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event {
     override def run(): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 49e033d..e85144b 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -24,7 +24,6 @@ import java.util.Properties
 import kafka.common.{InconsistentBrokerMetadataException, KafkaException}
 import kafka.server.RawMetaProperties._
 import kafka.utils._
-import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
@@ -97,7 +96,7 @@ class RawMetaProperties(val props: Properties = new Properties()) {
 object MetaProperties {
   def parse(properties: RawMetaProperties): MetaProperties = {
     properties.requireVersion(expectedVersion = 1)
-    val clusterId = requireClusterId(properties)
+    val clusterId = require(ClusterIdKey, properties.clusterId)
     val nodeId = require(NodeIdKey, properties.nodeId)
     new MetaProperties(clusterId, nodeId)
   }
@@ -105,16 +104,6 @@ object MetaProperties {
   def require[T](key: String, value: Option[T]): T = {
     value.getOrElse(throw new RuntimeException(s"Failed to find required property $key."))
   }
-
-  def requireClusterId(properties: RawMetaProperties): Uuid = {
-    val value = require(ClusterIdKey, properties.clusterId)
-    try {
-      Uuid.fromString(value)
-    } catch {
-      case e: Throwable => throw new RuntimeException(s"Failed to parse $ClusterIdKey property " +
-        s"as a UUID: ${e.getMessage}")
-    }
-  }
 }
 
 case class ZkMetaProperties(
@@ -135,13 +124,13 @@ case class ZkMetaProperties(
 }
 
 case class MetaProperties(
-  clusterId: Uuid,
+  clusterId: String,
   nodeId: Int,
 ) {
   def toProperties: Properties = {
     val properties = new RawMetaProperties()
     properties.version = 1
-    properties.clusterId = clusterId.toString
+    properties.clusterId = clusterId
     properties.nodeId = nodeId
     properties.props
   }
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index ff84007..549019d 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -198,7 +198,7 @@ object StorageTool extends Logging {
         s"does not appear to be a valid UUID: ${e.getMessage}")
     }
     require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
-    new MetaProperties(effectiveClusterId, config.nodeId)
+    new MetaProperties(effectiveClusterId.toString, config.nodeId)
   }
 
   def formatCommand(stream: PrintStream,
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index ba6ab407..c25e551 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -74,7 +74,7 @@ class TestRaftServer(
     socketServer.startup(startProcessingRequests = false)
 
     val metaProperties = MetaProperties(
-      clusterId = Uuid.ZERO_UUID,
+      clusterId = Uuid.ZERO_UUID.toString,
       nodeId = config.nodeId
     )
 
diff --git a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
index 49cb366..c7ce0ac 100644
--- a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
@@ -15,13 +15,13 @@ package kafka.server
 import java.io.File
 import java.util.Properties
 
-import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
 class BrokerMetadataCheckpointTest {
+  private val clusterIdBase64 = "H3KKO4NTRPaCWtEmm3vW7A"
 
   @Test
   def testReadWithNonExistentFile(): Unit = {
@@ -63,46 +63,44 @@ class BrokerMetadataCheckpointTest {
 
   @Test
   def testCreateMetadataProperties(): Unit = {
-    val meta = MetaProperties(
-      clusterId = Uuid.fromString("H3KKO4NTRPaCWtEmm3vW7A"),
-      nodeId = 5
-    )
-    val properties = new RawMetaProperties(meta.toProperties)
-    val meta2 = MetaProperties.parse(properties)
-    assertEquals(meta, meta2)
+    confirmValidForMetaProperties(clusterIdBase64)
   }
 
   @Test
   def testMetaPropertiesWithMissingVersion(): Unit = {
     val properties = new RawMetaProperties()
-    properties.clusterId = "H3KKO4NTRPaCWtEmm3vW7A"
+    properties.clusterId = clusterIdBase64
     properties.nodeId = 1
     assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
   }
 
   @Test
-  def testMetaPropertiesDoesNotAllowHexEncodedUUIDs(): Unit = {
-    val properties = new RawMetaProperties()
-    properties.version = 1
-    properties.clusterId = "7bc79ca1-9746-42a3-a35a-efb3cde44492"
-    properties.nodeId = 1
-    assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
+  def testMetaPropertiesAllowsHexEncodedUUIDs(): Unit = {
+    val clusterId = "7bc79ca1-9746-42a3-a35a-efb3cde44492"
+    confirmValidForMetaProperties(clusterId)
   }
 
   @Test
-  def testMetaPropertiesWithInvalidClusterId(): Unit = {
-    val properties = new RawMetaProperties()
-    properties.version = 1
-    properties.clusterId = "not a valid uuid"
-    properties.nodeId = 1
-    assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
+  def testMetaPropertiesWithNonUuidClusterId(): Unit = {
+    val clusterId = "not a valid uuid"
+    confirmValidForMetaProperties(clusterId)
+  }
+
+  private def confirmValidForMetaProperties(clusterId: String) = {
+    val meta = MetaProperties(
+      clusterId = clusterId,
+      nodeId = 5
+    )
+    val properties = new RawMetaProperties(meta.toProperties)
+    val meta2 = MetaProperties.parse(properties)
+    assertEquals(meta, meta2)
   }
 
   @Test
   def testMetaPropertiesWithMissingBrokerId(): Unit = {
     val properties = new RawMetaProperties()
     properties.version = 1
-    properties.clusterId = "H3KKO4NTRPaCWtEmm3vW7A"
+    properties.clusterId = clusterIdBase64
     assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
   }
 
@@ -110,7 +108,7 @@ class BrokerMetadataCheckpointTest {
   def testMetaPropertiesWithMissingControllerId(): Unit = {
     val properties = new RawMetaProperties()
     properties.version = 1
-    properties.clusterId = "H3KKO4NTRPaCWtEmm3vW7A"
+    properties.clusterId = clusterIdBase64
     assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index d3bcfef..aa717c8 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.clients.{Metadata, MockClient, NodeApiVersions}
 import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
 import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
@@ -76,7 +76,7 @@ class BrokerLifecycleManagerTest {
     }.toList.asJava)
     val mockChannelManager = new MockBrokerToControllerChannelManager(mockClient,
       time, controllerNodeProvider, nodeApiVersions)
-    val clusterId = Uuid.fromString("x4AJGXQSRnephtTZzujw4w")
+    val clusterId = "x4AJGXQSRnephtTZzujw4w"
     val advertisedListeners = new ListenerCollection()
     config.advertisedListeners.foreach { ep =>
       advertisedListeners.add(new Listener().setHost(ep.host).
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index b3b74ab..3e2e255 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -88,7 +88,7 @@ class ControllerApisTest {
       new KafkaConfig(props),
 
       // FIXME: Would make more sense to set controllerId here
-      MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
+      MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
       Seq.empty,
       new SimpleApiVersionManager(ListenerType.CONTROLLER)
     )
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 6166d73..9ada6d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -19,7 +19,6 @@ package kafka.server
 import java.io.File
 import java.nio.file.Files
 import java.util.Properties
-
 import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException, KafkaException}
 import kafka.log.Log
 import org.apache.kafka.common.Uuid
@@ -29,10 +28,11 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 
 class KafkaRaftServerTest {
+  private val clusterIdBase64 = "H3KKO4NTRPaCWtEmm3vW7A"
 
   @Test
   def testSuccessfulLoadMetaProperties(): Unit = {
-    val clusterId = Uuid.randomUuid()
+    val clusterId = clusterIdBase64
     val nodeId = 0
     val metaProperties = MetaProperties(clusterId, nodeId)
 
@@ -49,7 +49,7 @@ class KafkaRaftServerTest {
 
   @Test
   def testLoadMetaPropertiesWithInconsistentNodeId(): Unit = {
-    val clusterId = Uuid.randomUuid()
+    val clusterId = clusterIdBase64
     val metaNodeId = 1
     val configNodeId = 0
 
@@ -90,7 +90,7 @@ class KafkaRaftServerTest {
 
   @Test
   def testStartupFailsIfMetaPropertiesMissingInSomeLogDir(): Unit = {
-    val clusterId = Uuid.randomUuid()
+    val clusterId = clusterIdBase64
     val nodeId = 1
 
     // One log dir is online and has properly formatted `meta.properties`.
@@ -110,7 +110,7 @@ class KafkaRaftServerTest {
 
   @Test
   def testStartupFailsIfMetaLogDirIsOffline(): Unit = {
-    val clusterId = Uuid.randomUuid()
+    val clusterId = clusterIdBase64
     val nodeId = 1
 
     // One log dir is online and has properly formatted `meta.properties`
@@ -131,7 +131,7 @@ class KafkaRaftServerTest {
 
   @Test
   def testStartupDoesNotFailIfDataDirIsOffline(): Unit = {
-    val clusterId = Uuid.randomUuid()
+    val clusterId = clusterIdBase64
     val nodeId = 1
 
     // One log dir is online and has properly formatted `meta.properties`
@@ -155,7 +155,7 @@ class KafkaRaftServerTest {
   @Test
   def testStartupFailsIfUnexpectedMetadataDir(): Unit = {
     val nodeId = 1
-    val clusterId = Uuid.randomUuid()
+    val clusterId = clusterIdBase64
 
     // Create two directories with valid `meta.properties`
     val metadataDir = TestUtils.tempDirectory()
@@ -186,7 +186,7 @@ class KafkaRaftServerTest {
 
     // Create a random clusterId in each log dir
     Seq(logDir1, logDir2).foreach { dir =>
-      writeMetaProperties(dir, MetaProperties(clusterId = Uuid.randomUuid(), nodeId))
+      writeMetaProperties(dir, MetaProperties(clusterId = Uuid.randomUuid().toString, nodeId))
     }
 
     val configProperties = new Properties
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index d601e36..787c767 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -25,7 +25,6 @@ import java.util.Properties
 
 import kafka.server.{KafkaConfig, MetaProperties}
 import kafka.utils.TestUtils
-import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.{Test, Timeout}
@@ -156,7 +155,7 @@ Found problem:
     val tempDir = TestUtils.tempDir()
     try {
       val metaProperties = MetaProperties(
-        clusterId = Uuid.fromString("XcZZOzUqS4yHOjhMQB6JLQ"), nodeId = 2)
+        clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
       val stream = new ByteArrayOutputStream()
       assertEquals(0, StorageTool.
         formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index fcc47f1..e820811 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -139,7 +139,7 @@ public class QuorumControllerTest {
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
                     new BrokerRegistrationRequestData().
                         setBrokerId(0).
-                        setClusterId(Uuid.fromString("06B-K3N1TBCNYFgruEVP0Q")).
+                        setClusterId("06B-K3N1TBCNYFgruEVP0Q").
                         setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
                         setListeners(listeners));
                 assertEquals(0L, reply.get().epoch());