You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/08/09 20:02:03 UTC
[kafka] branch 3.0 updated: KAFKA-13165: Validate KRaft node id,
process role and quorum voters (#11179)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 385eb29 KAFKA-13165: Validate KRaft node id, process role and quorum voters (#11179)
385eb29 is described below
commit 385eb296b87c2afedc557a8b2d479b031c375308
Author: Ryan Dielhenn <rd...@confluent.io>
AuthorDate: Mon Aug 9 12:57:23 2021 -0700
KAFKA-13165: Validate KRaft node id, process role and quorum voters (#11179)
Validate that KRaft controllers are members of the KRaft quorum, and non-controllers are not.
This validation assumes that controllers and brokers have the same ID only when they are
co-located.
Reviewers: David Arthur <mu...@gmail.com>, José Armando García Sancio <js...@gmail.com>, Luke Chen <sh...@gmail.com>
---
config/kraft/broker.properties | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 21 ++++--
.../scala/kafka/raft/KafkaMetadataLogTest.scala | 5 +-
.../test/scala/unit/kafka/KafkaConfigTest.scala | 77 +++++++++++++++++++++-
.../scala/unit/kafka/raft/RaftManagerTest.scala | 32 ++++++---
.../kafka/server/BrokerLifecycleManagerTest.scala | 1 +
.../unit/kafka/server/ControllerApisTest.scala | 1 +
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +
.../scala/unit/kafka/server/KafkaConfigTest.scala | 12 +++-
.../unit/kafka/server/KafkaRaftServerTest.scala | 7 ++
.../test/scala/unit/kafka/server/ServerTest.scala | 1 +
.../scala/unit/kafka/tools/StorageToolTest.scala | 1 +
12 files changed, 141 insertions(+), 21 deletions(-)
diff --git a/config/kraft/broker.properties b/config/kraft/broker.properties
index 1b71803..dfbd6ec 100644
--- a/config/kraft/broker.properties
+++ b/config/kraft/broker.properties
@@ -24,7 +24,7 @@
process.roles=broker
# The node id associated with this instance's roles
-node.id=1
+node.id=2
# The connect string for the controller quorum
controller.quorum.voters=1@localhost:9093
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 703d796..a30d31d 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -390,6 +390,7 @@ object KafkaConfig {
val MetadataLogSegmentMillisProp = "metadata.log.segment.ms"
val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
+ val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@@ -1919,6 +1920,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
s"when `process.roles` is defined (i.e. when running in KRaft mode).")
}
+
+ // Validate process.roles with controller.quorum.voters
+ val voterIds: Set[Integer] = RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet
+ if (voterIds.isEmpty) {
+ throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.")
+ } else if (processRoles.contains(ControllerRole)) {
+ // Ensure that controllers use their node.id as a voter in controller.quorum.voters
+ require(voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
+ } else {
+ // Ensure that the broker's node.id is not an id in controller.quorum.voters
+ require(!voterIds.contains(nodeId), s"If ${KafkaConfig.ProcessRolesProp} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=$voterIds")
+ }
+
+ require(getClass(KafkaConfig.AlterConfigPolicyClassNameProp) == null, s"${KafkaConfig.AlterConfigPolicyClassNameProp} is not supported in KRaft.")
+ require(getClass(KafkaConfig.CreateTopicPolicyClassNameProp) == null, s"${KafkaConfig.CreateTopicPolicyClassNameProp} is not supported in KRaft.")
}
require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
@@ -2008,10 +2024,5 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
require(principalBuilderClass != null, s"${KafkaConfig.PrincipalBuilderClassProp} must be non-null")
require(classOf[KafkaPrincipalSerde].isAssignableFrom(principalBuilderClass),
s"${KafkaConfig.PrincipalBuilderClassProp} must implement KafkaPrincipalSerde")
-
- if (usesSelfManagedQuorum) {
- require(getClass(KafkaConfig.AlterConfigPolicyClassNameProp) == null, s"${KafkaConfig.AlterConfigPolicyClassNameProp} is not supported in KRaft.")
- require(getClass(KafkaConfig.CreateTopicPolicyClassNameProp) == null, s"${KafkaConfig.CreateTopicPolicyClassNameProp} is not supported in KRaft.")
- }
}
}
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 415b694..e02529b 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -17,7 +17,7 @@
package kafka.raft
import kafka.log.{Defaults, Log, SegmentDeletion}
-import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp}
+import kafka.server.KafkaConfig.{MetadataLogSegmentBytesProp, MetadataLogSegmentMillisProp, MetadataLogSegmentMinBytesProp, NodeIdProp, ProcessRolesProp, QuorumVotersProp}
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.common.errors.{InvalidConfigurationException, RecordTooLargeException}
@@ -59,7 +59,8 @@ final class KafkaMetadataLogTest {
def testConfig(): Unit = {
val props = new Properties()
props.put(ProcessRolesProp, util.Arrays.asList("broker"))
- props.put(NodeIdProp, Int.box(1))
+ props.put(QuorumVotersProp, "1@localhost:9092")
+ props.put(NodeIdProp, Int.box(2))
props.put(MetadataLogSegmentBytesProp, Int.box(10240))
props.put(MetadataLogSegmentMillisProp, Int.box(10 * 1024))
assertThrows(classOf[InvalidConfigurationException], () => {
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index d7e130e..bf056e8 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -19,14 +19,16 @@ package kafka
import java.io.File
import java.nio.file.Files
import java.util
+import java.util.Properties
import kafka.server.KafkaConfig
import kafka.utils.Exit
+import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.api.Assertions._
-import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import scala.jdk.CollectionConverters._
@@ -80,6 +82,79 @@ class KafkaTest {
}
@Test
+ def testBrokerRoleNodeIdValidation(): Unit = {
+ // Ensure that validation is happening at startup to check that brokers do not use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that with a valid config no exception is thrown
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ @Test
+ def testControllerRoleNodeIdValidation(): Unit = {
+ // Ensure that validation is happening at startup to check that controllers use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that with a valid config no exception is thrown
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ @Test
+ def testColocatedRoleNodeIdValidation(): Unit = {
+ // Ensure that validation is happening at startup to check that colocated processes use their node.id as a voter in controller.quorum.voters
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that with a valid config no exception is thrown
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "2")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ @Test
+ def testMustContainQuorumVotersIfUsingProcessRoles(): Unit = {
+ // Ensure that validation is happening at startup to check that if process.roles is set controller.quorum.voters is not empty
+ val propertiesFile = new Properties
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+ propertiesFile.setProperty(KafkaConfig.NodeIdProp, "1")
+ propertiesFile.setProperty(KafkaConfig.QuorumVotersProp, "")
+ setListenerProps(propertiesFile)
+ assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that if neither process.roles nor controller.quorum.voters is populated, then an exception is thrown if zookeeper.connect is not defined
+ propertiesFile.setProperty(KafkaConfig.ProcessRolesProp, "")
+ assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(propertiesFile))
+
+ // Ensure that no exception is thrown once zookeeper.connect is defined
+ propertiesFile.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ KafkaConfig.fromProps(propertiesFile)
+ }
+
+ private def setListenerProps(props: Properties): Unit = {
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+ if (props.getProperty(KafkaConfig.ProcessRolesProp).contains("broker")) {
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+ }
+ }
+
+ @Test
def testKafkaSslPasswords(): Unit = {
val propertiesFile = prepareDefaultConfig()
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 40256ae..827d42e 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -28,27 +28,41 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.Mockito._
+import java.io.File
+
class RaftManagerTest {
- private def instantiateRaftManagerWithConfigs(processRoles: String, nodeId:String) = {
- def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String): KafkaConfig = {
+ private def instantiateRaftManagerWithConfigs(topicPartition: TopicPartition, processRoles: String, nodeId: String) = {
+ def configWithProcessRolesAndNodeId(processRoles: String, nodeId: String, logDir: File): KafkaConfig = {
val props = new Properties
+ props.setProperty(KafkaConfig.MetadataLogDirProp, logDir.getPath)
props.setProperty(KafkaConfig.ProcessRolesProp, processRoles)
props.setProperty(KafkaConfig.NodeIdProp, nodeId)
props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9093")
props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
- props.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, nodeId.concat("@localhost:9093"))
- if (processRoles.contains("broker"))
+ if (processRoles.contains("broker")) {
props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "PLAINTEXT")
props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+ if (!processRoles.contains("controller")) {
+ val voterId = (nodeId.toInt + 1)
+ props.setProperty(KafkaConfig.QuorumVotersProp, s"${voterId}@localhost:9093")
+ }
+ }
+
+ if (processRoles.contains("controller")) {
+ props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093")
+ }
+
new KafkaConfig(props)
}
- val config = configWithProcessRolesAndNodeId(processRoles, nodeId)
+ val logDir = TestUtils.tempDirectory()
+ val config = configWithProcessRolesAndNodeId(processRoles, nodeId, logDir)
val topicId = new Uuid(0L, 2L)
val metaProperties = MetaProperties(
clusterId = Uuid.randomUuid.toString,
@@ -59,7 +73,7 @@ class RaftManagerTest {
metaProperties,
config,
new ByteArraySerde,
- new TopicPartition("__taft_id_test", 0),
+ topicPartition,
topicId,
Time.SYSTEM,
new Metrics(Time.SYSTEM),
@@ -70,21 +84,21 @@ class RaftManagerTest {
@Test
def testSentinelNodeIdIfBrokerRoleOnly(): Unit = {
- val raftManager = instantiateRaftManagerWithConfigs("broker", "1")
+ val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1")
assertFalse(raftManager.client.nodeId.isPresent)
raftManager.shutdown()
}
@Test
def testNodeIdPresentIfControllerRoleOnly(): Unit = {
- val raftManager = instantiateRaftManagerWithConfigs("controller", "1")
+ val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1")
assertTrue(raftManager.client.nodeId.getAsInt == 1)
raftManager.shutdown()
}
@Test
def testNodeIdPresentIfColocated(): Unit = {
- val raftManager = instantiateRaftManagerWithConfigs("controller,broker", "1")
+ val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1")
assertTrue(raftManager.client.nodeId.getAsInt == 1)
raftManager.shutdown()
}
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 288077c..a551288 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -48,6 +48,7 @@ class BrokerLifecycleManagerTest {
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo")
properties.setProperty(KafkaConfig.ProcessRolesProp, "broker")
properties.setProperty(KafkaConfig.NodeIdProp, "1")
+ properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9093")
properties.setProperty(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp, "300000")
properties
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 7ab8a0b..768c06a 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -96,6 +96,7 @@ class ControllerApisTest {
props.put(KafkaConfig.NodeIdProp, nodeId: java.lang.Integer)
props.put(KafkaConfig.ProcessRolesProp, "controller")
props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
+ props.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
new ControllerApis(
requestChannel,
authorizer,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6b91cfb7..c5ba162 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -132,6 +132,8 @@ class KafkaApisTest {
val properties = TestUtils.createBrokerConfig(brokerId, "")
properties.put(KafkaConfig.NodeIdProp, brokerId.toString)
properties.put(KafkaConfig.ProcessRolesProp, "broker")
+ val voterId = (brokerId + 1)
+ properties.put(KafkaConfig.QuorumVotersProp, s"$voterId@localhost:9093")
properties
} else {
TestUtils.createBrokerConfig(brokerId, "zk")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b1d54d3..2e38df0 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -262,6 +262,8 @@ class KafkaConfigTest {
props.put(KafkaConfig.ProcessRolesProp, "controller")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
props.put(KafkaConfig.NodeIdProp, "1")
+ props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+
assertFalse(isValidKafkaConfig(props))
val caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("controller.listener.names cannot be empty if the server has the controller role"))
@@ -1053,7 +1055,7 @@ class KafkaConfigTest {
private def assertInvalidQuorumVoters(value: String): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+ props.put(KafkaConfig.QuorumVotersProp, value)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@@ -1078,7 +1080,7 @@ class KafkaConfigTest {
private def assertValidQuorumVoters(value: String, expectedVoters: util.Map[Integer, AddressSpec]): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put(RaftConfig.QUORUM_VOTERS_CONFIG, value)
+ props.put(KafkaConfig.QuorumVotersProp, value)
val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
}
@@ -1091,6 +1093,7 @@ class KafkaConfigTest {
val largeBrokerId = 2000
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
assertTrue(isValidKafkaConfig(props))
}
@@ -1117,6 +1120,7 @@ class KafkaConfigTest {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
}
@@ -1193,6 +1197,7 @@ class KafkaConfigTest {
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
props.put(KafkaConfig.NodeIdProp, "1")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertTrue(isValidKafkaConfig(props))
}
@@ -1206,6 +1211,7 @@ class KafkaConfigTest {
props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
props.put(KafkaConfig.LogDirProp, dataDir)
props.put(KafkaConfig.NodeIdProp, "1")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertTrue(isValidKafkaConfig(props))
val config = KafkaConfig.fromProps(props)
@@ -1222,11 +1228,11 @@ class KafkaConfigTest {
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
props.put(KafkaConfig.NodeIdProp, "1")
+ props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertTrue(isValidKafkaConfig(props))
val config = KafkaConfig.fromProps(props)
assertEquals(dataDir1, config.metadataLogDir)
assertEquals(Seq(dataDir1, dataDir2), config.logDirs)
}
-
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 6990a34..4b4a86b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -40,6 +40,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9092")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
val (loadedMetaProperties, offlineDirs) =
@@ -60,6 +61,7 @@ class KafkaRaftServerTest {
configProperties.put(KafkaConfig.ProcessRolesProp, "controller")
configProperties.put(KafkaConfig.NodeIdProp, configNodeId.toString)
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"$configNodeId@localhost:9092")
configProperties.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
assertThrows(classOf[InconsistentNodeIdException], () =>
@@ -105,6 +107,7 @@ class KafkaRaftServerTest {
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
val config = KafkaConfig.fromProps(configProperties)
@@ -124,6 +127,7 @@ class KafkaRaftServerTest {
val invalidDir = TestUtils.tempFile("blah")
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.MetadataLogDirProp, invalidDir.getAbsolutePath)
configProperties.put(KafkaConfig.LogDirProp, validDir.getAbsolutePath)
@@ -146,6 +150,7 @@ class KafkaRaftServerTest {
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.MetadataLogDirProp, validDir.getAbsolutePath)
configProperties.put(KafkaConfig.LogDirProp, invalidDir.getAbsolutePath)
val config = KafkaConfig.fromProps(configProperties)
@@ -174,6 +179,7 @@ class KafkaRaftServerTest {
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.MetadataLogDirProp, metadataDir.getAbsolutePath)
configProperties.put(KafkaConfig.LogDirProp, dataDir.getAbsolutePath)
val config = KafkaConfig.fromProps(configProperties)
@@ -194,6 +200,7 @@ class KafkaRaftServerTest {
val configProperties = new Properties
configProperties.put(KafkaConfig.ProcessRolesProp, "broker")
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
configProperties.put(KafkaConfig.LogDirProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
val config = KafkaConfig.fromProps(configProperties)
diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala
index e79a46b..79f8cd9 100644
--- a/core/src/test/scala/unit/kafka/server/ServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala
@@ -35,6 +35,7 @@ class ServerTest {
val props = new Properties()
props.put(KafkaConfig.ProcessRolesProp, "broker")
props.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ props.put(KafkaConfig.QuorumVotersProp, s"${(nodeId + 1)}@localhost:9092")
val config = KafkaConfig.fromProps(props)
val context = Server.createKafkaMetricsContext(config, clusterId)
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index f46f082..0242c33 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -37,6 +37,7 @@ class StorageToolTest {
properties.setProperty(KafkaConfig.LogDirsProp, "/tmp/foo,/tmp/bar")
properties.setProperty(KafkaConfig.ProcessRolesProp, "controller")
properties.setProperty(KafkaConfig.NodeIdProp, "2")
+ properties.setProperty(KafkaConfig.QuorumVotersProp, s"2@localhost:9092")
properties.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT")
properties
}