You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/03/19 18:27:08 UTC
[kafka] branch 2.8 updated: MINOR: The new KIP-500 code should
treat cluster ID as a string (#10357)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 85f35bf MINOR: The new KIP-500 code should treat cluster ID as a string (#10357)
85f35bf is described below
commit 85f35bf877716bb88a7750b340dacafb9f6b28f9
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Fri Mar 19 12:56:04 2021 -0400
MINOR: The new KIP-500 code should treat cluster ID as a string (#10357)
Cluster ID has traditionally been treated as a string by the Kafka protocol (for example,
AdminClient returns it as a string). The new KIP-500 code should continue this practice. If
we don't do this, upgrading existing clusters may be harder to do.
Reviewers: Colin P. McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
.../common/message/BrokerRegistrationRequest.json | 2 +-
.../kafka/common/requests/RequestResponseTest.java | 2 +-
.../kafka/server/BrokerLifecycleManager.scala | 6 +--
.../kafka/server/BrokerMetadataCheckpoint.scala | 17 ++-------
core/src/main/scala/kafka/tools/StorageTool.scala | 2 +-
.../main/scala/kafka/tools/TestRaftServer.scala | 2 +-
.../server/BrokerMetadataCheckpointTest.scala | 44 +++++++++++-----------
.../kafka/server/BrokerLifecycleManagerTest.scala | 4 +-
.../unit/kafka/server/ControllerApisTest.scala | 2 +-
.../unit/kafka/server/KafkaRaftServerTest.scala | 16 ++++----
.../scala/unit/kafka/tools/StorageToolTest.scala | 3 +-
.../kafka/controller/QuorumControllerTest.java | 2 +-
12 files changed, 44 insertions(+), 58 deletions(-)
diff --git a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 3e27cf1..e0ad06a 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -23,7 +23,7 @@
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+",
"about": "The broker ID." },
- { "name": "ClusterId", "type": "uuid", "versions": "0+",
+ { "name": "ClusterId", "type": "string", "versions": "0+",
"about": "The cluster id of the broker process." },
{ "name": "IncarnationId", "type": "uuid", "versions": "0+",
"about": "The incarnation id of the broker process." },
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 920a951..de88025 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -2672,7 +2672,7 @@ public class RequestResponseTest {
private BrokerRegistrationRequest createBrokerRegistrationRequest(short v) {
BrokerRegistrationRequestData data = new BrokerRegistrationRequestData()
.setBrokerId(1)
- .setClusterId(Uuid.randomUuid())
+ .setClusterId(Uuid.randomUuid().toString())
.setRack("1")
.setFeatures(new BrokerRegistrationRequestData.FeatureCollection(singletonList(
new BrokerRegistrationRequestData.Feature()).iterator()))
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index b15fc1c..7569fc24 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -148,7 +148,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
* The cluster ID, or null if this manager has not been started yet. This variable can
* only be read or written from the event queue thread.
*/
- private var _clusterId: Uuid = _
+ private var _clusterId: String = _
/**
* The listeners which this broker advertises. This variable can only be read or
@@ -182,7 +182,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
*/
def start(highestMetadataOffsetProvider: () => Long,
channelManager: BrokerToControllerChannelManager,
- clusterId: Uuid,
+ clusterId: String,
advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]): Unit = {
eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
@@ -245,7 +245,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
private class StartupEvent(highestMetadataOffsetProvider: () => Long,
channelManager: BrokerToControllerChannelManager,
- clusterId: Uuid,
+ clusterId: String,
advertisedListeners: ListenerCollection,
supportedFeatures: util.Map[String, VersionRange]) extends EventQueue.Event {
override def run(): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
index 49e033d..e85144b 100755
--- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
@@ -24,7 +24,6 @@ import java.util.Properties
import kafka.common.{InconsistentBrokerMetadataException, KafkaException}
import kafka.server.RawMetaProperties._
import kafka.utils._
-import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import scala.collection.mutable
@@ -97,7 +96,7 @@ class RawMetaProperties(val props: Properties = new Properties()) {
object MetaProperties {
def parse(properties: RawMetaProperties): MetaProperties = {
properties.requireVersion(expectedVersion = 1)
- val clusterId = requireClusterId(properties)
+ val clusterId = require(ClusterIdKey, properties.clusterId)
val nodeId = require(NodeIdKey, properties.nodeId)
new MetaProperties(clusterId, nodeId)
}
@@ -105,16 +104,6 @@ object MetaProperties {
def require[T](key: String, value: Option[T]): T = {
value.getOrElse(throw new RuntimeException(s"Failed to find required property $key."))
}
-
- def requireClusterId(properties: RawMetaProperties): Uuid = {
- val value = require(ClusterIdKey, properties.clusterId)
- try {
- Uuid.fromString(value)
- } catch {
- case e: Throwable => throw new RuntimeException(s"Failed to parse $ClusterIdKey property " +
- s"as a UUID: ${e.getMessage}")
- }
- }
}
case class ZkMetaProperties(
@@ -135,13 +124,13 @@ case class ZkMetaProperties(
}
case class MetaProperties(
- clusterId: Uuid,
+ clusterId: String,
nodeId: Int,
) {
def toProperties: Properties = {
val properties = new RawMetaProperties()
properties.version = 1
- properties.clusterId = clusterId.toString
+ properties.clusterId = clusterId
properties.nodeId = nodeId
properties.props
}
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index ff84007..549019d 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -198,7 +198,7 @@ object StorageTool extends Logging {
s"does not appear to be a valid UUID: ${e.getMessage}")
}
require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.")
- new MetaProperties(effectiveClusterId, config.nodeId)
+ new MetaProperties(effectiveClusterId.toString, config.nodeId)
}
def formatCommand(stream: PrintStream,
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index ba6ab407..c25e551 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -74,7 +74,7 @@ class TestRaftServer(
socketServer.startup(startProcessingRequests = false)
val metaProperties = MetaProperties(
- clusterId = Uuid.ZERO_UUID,
+ clusterId = Uuid.ZERO_UUID.toString,
nodeId = config.nodeId
)
diff --git a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
index 49cb366..c7ce0ac 100644
--- a/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
@@ -15,13 +15,13 @@ package kafka.server
import java.io.File
import java.util.Properties
-import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
class BrokerMetadataCheckpointTest {
+ private val clusterIdBase64 = "H3KKO4NTRPaCWtEmm3vW7A"
@Test
def testReadWithNonExistentFile(): Unit = {
@@ -63,46 +63,44 @@ class BrokerMetadataCheckpointTest {
@Test
def testCreateMetadataProperties(): Unit = {
- val meta = MetaProperties(
- clusterId = Uuid.fromString("H3KKO4NTRPaCWtEmm3vW7A"),
- nodeId = 5
- )
- val properties = new RawMetaProperties(meta.toProperties)
- val meta2 = MetaProperties.parse(properties)
- assertEquals(meta, meta2)
+ confirmValidForMetaProperties(clusterIdBase64)
}
@Test
def testMetaPropertiesWithMissingVersion(): Unit = {
val properties = new RawMetaProperties()
- properties.clusterId = "H3KKO4NTRPaCWtEmm3vW7A"
+ properties.clusterId = clusterIdBase64
properties.nodeId = 1
assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
}
@Test
- def testMetaPropertiesDoesNotAllowHexEncodedUUIDs(): Unit = {
- val properties = new RawMetaProperties()
- properties.version = 1
- properties.clusterId = "7bc79ca1-9746-42a3-a35a-efb3cde44492"
- properties.nodeId = 1
- assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
+ def testMetaPropertiesAllowsHexEncodedUUIDs(): Unit = {
+ val clusterId = "7bc79ca1-9746-42a3-a35a-efb3cde44492"
+ confirmValidForMetaProperties(clusterId)
}
@Test
- def testMetaPropertiesWithInvalidClusterId(): Unit = {
- val properties = new RawMetaProperties()
- properties.version = 1
- properties.clusterId = "not a valid uuid"
- properties.nodeId = 1
- assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
+ def testMetaPropertiesWithNonUuidClusterId(): Unit = {
+ val clusterId = "not a valid uuid"
+ confirmValidForMetaProperties(clusterId)
+ }
+
+ private def confirmValidForMetaProperties(clusterId: String) = {
+ val meta = MetaProperties(
+ clusterId = clusterId,
+ nodeId = 5
+ )
+ val properties = new RawMetaProperties(meta.toProperties)
+ val meta2 = MetaProperties.parse(properties)
+ assertEquals(meta, meta2)
}
@Test
def testMetaPropertiesWithMissingBrokerId(): Unit = {
val properties = new RawMetaProperties()
properties.version = 1
- properties.clusterId = "H3KKO4NTRPaCWtEmm3vW7A"
+ properties.clusterId = clusterIdBase64
assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
}
@@ -110,7 +108,7 @@ class BrokerMetadataCheckpointTest {
def testMetaPropertiesWithMissingControllerId(): Unit = {
val properties = new RawMetaProperties()
properties.version = 1
- properties.clusterId = "H3KKO4NTRPaCWtEmm3vW7A"
+ properties.clusterId = clusterIdBase64
assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties))
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index d3bcfef..aa717c8 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.clients.{Metadata, MockClient, NodeApiVersions}
import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.{Node, Uuid}
+import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.ClusterResourceListeners
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
@@ -76,7 +76,7 @@ class BrokerLifecycleManagerTest {
}.toList.asJava)
val mockChannelManager = new MockBrokerToControllerChannelManager(mockClient,
time, controllerNodeProvider, nodeApiVersions)
- val clusterId = Uuid.fromString("x4AJGXQSRnephtTZzujw4w")
+ val clusterId = "x4AJGXQSRnephtTZzujw4w"
val advertisedListeners = new ListenerCollection()
config.advertisedListeners.foreach { ep =>
advertisedListeners.add(new Listener().setHost(ep.host).
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index b3b74ab..3e2e255 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -88,7 +88,7 @@ class ControllerApisTest {
new KafkaConfig(props),
// FIXME: Would make more sense to set controllerId here
- MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
+ MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
Seq.empty,
new SimpleApiVersionManager(ListenerType.CONTROLLER)
)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 6166d73..9ada6d5 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -19,7 +19,6 @@ package kafka.server
import java.io.File
import java.nio.file.Files
import java.util.Properties
-
import kafka.common.{InconsistentBrokerMetadataException, InconsistentNodeIdException, KafkaException}
import kafka.log.Log
import org.apache.kafka.common.Uuid
@@ -29,10 +28,11 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
class KafkaRaftServerTest {
+ private val clusterIdBase64 = "H3KKO4NTRPaCWtEmm3vW7A"
@Test
def testSuccessfulLoadMetaProperties(): Unit = {
- val clusterId = Uuid.randomUuid()
+ val clusterId = clusterIdBase64
val nodeId = 0
val metaProperties = MetaProperties(clusterId, nodeId)
@@ -49,7 +49,7 @@ class KafkaRaftServerTest {
@Test
def testLoadMetaPropertiesWithInconsistentNodeId(): Unit = {
- val clusterId = Uuid.randomUuid()
+ val clusterId = clusterIdBase64
val metaNodeId = 1
val configNodeId = 0
@@ -90,7 +90,7 @@ class KafkaRaftServerTest {
@Test
def testStartupFailsIfMetaPropertiesMissingInSomeLogDir(): Unit = {
- val clusterId = Uuid.randomUuid()
+ val clusterId = clusterIdBase64
val nodeId = 1
// One log dir is online and has properly formatted `meta.properties`.
@@ -110,7 +110,7 @@ class KafkaRaftServerTest {
@Test
def testStartupFailsIfMetaLogDirIsOffline(): Unit = {
- val clusterId = Uuid.randomUuid()
+ val clusterId = clusterIdBase64
val nodeId = 1
// One log dir is online and has properly formatted `meta.properties`
@@ -131,7 +131,7 @@ class KafkaRaftServerTest {
@Test
def testStartupDoesNotFailIfDataDirIsOffline(): Unit = {
- val clusterId = Uuid.randomUuid()
+ val clusterId = clusterIdBase64
val nodeId = 1
// One log dir is online and has properly formatted `meta.properties`
@@ -155,7 +155,7 @@ class KafkaRaftServerTest {
@Test
def testStartupFailsIfUnexpectedMetadataDir(): Unit = {
val nodeId = 1
- val clusterId = Uuid.randomUuid()
+ val clusterId = clusterIdBase64
// Create two directories with valid `meta.properties`
val metadataDir = TestUtils.tempDirectory()
@@ -186,7 +186,7 @@ class KafkaRaftServerTest {
// Create a random clusterId in each log dir
Seq(logDir1, logDir2).foreach { dir =>
- writeMetaProperties(dir, MetaProperties(clusterId = Uuid.randomUuid(), nodeId))
+ writeMetaProperties(dir, MetaProperties(clusterId = Uuid.randomUuid().toString, nodeId))
}
val configProperties = new Properties
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index d601e36..787c767 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -25,7 +25,6 @@ import java.util.Properties
import kafka.server.{KafkaConfig, MetaProperties}
import kafka.utils.TestUtils
-import org.apache.kafka.common.Uuid
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.{Test, Timeout}
@@ -156,7 +155,7 @@ Found problem:
val tempDir = TestUtils.tempDir()
try {
val metaProperties = MetaProperties(
- clusterId = Uuid.fromString("XcZZOzUqS4yHOjhMQB6JLQ"), nodeId = 2)
+ clusterId = "XcZZOzUqS4yHOjhMQB6JLQ", nodeId = 2)
val stream = new ByteArrayOutputStream()
assertEquals(0, StorageTool.
formatCommand(new PrintStream(stream), Seq(tempDir.toString), metaProperties, false))
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index fcc47f1..e820811 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -139,7 +139,7 @@ public class QuorumControllerTest {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(0).
- setClusterId(Uuid.fromString("06B-K3N1TBCNYFgruEVP0Q")).
+ setClusterId("06B-K3N1TBCNYFgruEVP0Q").
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setListeners(listeners));
assertEquals(0L, reply.get().epoch());