You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/24 22:47:35 UTC
[kafka] 02/02: MINOR; Fix error message when validating KRaft config (#12717)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e8c6a7bb69fe30cdcfb41cb648276083f74f65ba
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Oct 7 10:40:55 2022 -0700
MINOR; Fix error message when validating KRaft config (#12717)
The error message reported when advertised.listeners is used in
controller only is confusing. When the KRaft server is configured to
controller only the following must be true:
1. `advertised.listeners` is not set
2. `listeners` contains a listener for every name in `controller.listener.names`
3. `controller.listener.names` contains a name for every listener in `listeners`
Reviewers: Jason Gustafson <ja...@confluent.io>, Igor Soarez <i...@soarez.me>
---
core/src/main/scala/kafka/server/KafkaConfig.scala | 18 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 524 +++++++++++----------
.../test/scala/unit/kafka/utils/TestUtils.scala | 7 +-
3 files changed, 285 insertions(+), 264 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2df40e97b72..ca7c474fdcf 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -2162,14 +2162,16 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// KRaft controller-only
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
- // advertised listeners must be empty when not also running the broker role
- val sourceOfAdvertisedListeners: String =
- if (getString(KafkaConfig.AdvertisedListenersProp) != null)
- s"${KafkaConfig.AdvertisedListenersProp}"
- else
- s"${KafkaConfig.ListenersProp}"
- require(effectiveAdvertisedListeners.isEmpty,
- s"The $sourceOfAdvertisedListeners config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller")
+ // advertised listeners must be empty when only the controller is configured
+ require(
+ getString(KafkaConfig.AdvertisedListenersProp) == null,
+ s"The ${KafkaConfig.AdvertisedListenersProp} config must be empty when ${KafkaConfig.ProcessRolesProp}=controller"
+ )
+ // listeners should only contain listeners also enumerated in the controller listener
+ require(
+ effectiveAdvertisedListeners.isEmpty,
+ s"The ${KafkaConfig.ListenersProp} config must only contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp}=controller"
+ )
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateControllerListenerExistsForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 78adff5f8fc..b9eee175503 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -49,7 +49,7 @@ class KafkaConfigTest {
@Test
def testLogRetentionTimeHoursProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+ props.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "1")
val cfg = KafkaConfig.fromProps(props)
assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -58,7 +58,7 @@ class KafkaConfigTest {
@Test
def testLogRetentionTimeMinutesProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
+ props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "30")
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -67,7 +67,7 @@ class KafkaConfigTest {
@Test
def testLogRetentionTimeMsProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
+ props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -84,8 +84,8 @@ class KafkaConfigTest {
@Test
def testLogRetentionTimeBothMinutesAndHoursProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
- props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+ props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "30")
+ props.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "1")
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -94,8 +94,8 @@ class KafkaConfigTest {
@Test
def testLogRetentionTimeBothMinutesAndMsProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
- props.put(KafkaConfig.LogRetentionTimeMinutesProp, "10")
+ props.setProperty(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
+ props.setProperty(KafkaConfig.LogRetentionTimeMinutesProp, "10")
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
@@ -109,9 +109,9 @@ class KafkaConfigTest {
val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
- props1.put("log.retention.ms", "-1")
- props2.put("log.retention.minutes", "-1")
- props3.put("log.retention.hours", "-1")
+ props1.setProperty("log.retention.ms", "-1")
+ props2.setProperty("log.retention.minutes", "-1")
+ props3.setProperty("log.retention.hours", "-1")
val cfg1 = KafkaConfig.fromProps(props1)
val cfg2 = KafkaConfig.fromProps(props2)
@@ -120,13 +120,13 @@ class KafkaConfigTest {
assertEquals(-1, cfg2.logRetentionTimeMillis, "Should be -1")
assertEquals(-1, cfg3.logRetentionTimeMillis, "Should be -1")
- props4.put("log.retention.ms", "-1")
- props4.put("log.retention.minutes", "30")
+ props4.setProperty("log.retention.ms", "-1")
+ props4.setProperty("log.retention.minutes", "30")
val cfg4 = KafkaConfig.fromProps(props4)
assertEquals(-1, cfg4.logRetentionTimeMillis, "Should be -1")
- props5.put("log.retention.ms", "0")
+ props5.setProperty("log.retention.ms", "0")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props5))
}
@@ -137,9 +137,9 @@ class KafkaConfigTest {
val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props1.put("log.retention.ms", "0")
- props2.put("log.retention.minutes", "0")
- props3.put("log.retention.hours", "0")
+ props1.setProperty("log.retention.ms", "0")
+ props2.setProperty("log.retention.minutes", "0")
+ props3.setProperty("log.retention.hours", "0")
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props1))
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props2))
@@ -152,9 +152,9 @@ class KafkaConfigTest {
val port = 9999
val hostName = "fake-host"
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.ListenersProp, s"PLAINTEXT://$hostName:$port")
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.effectiveAdvertisedListeners
@@ -170,7 +170,7 @@ class KafkaConfigTest {
val advertisedPort = 1234
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$advertisedHostName:$advertisedPort")
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.effectiveAdvertisedListeners
@@ -183,35 +183,35 @@ class KafkaConfigTest {
@Test
def testDuplicateListeners(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
// listeners with duplicate port
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,SSL://localhost:9091")
assertBadConfigContainingMessage(props, "Each listener must have a different port")
// listeners with duplicate name
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
assertBadConfigContainingMessage(props, "Each listener must have a different name")
// advertised listeners can have duplicate ports
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "HOST")
- props.put(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
- props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "HOST:SASL_SSL,LB:SASL_SSL")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "HOST")
+ props.setProperty(KafkaConfig.ListenersProp, "HOST://localhost:9091,LB://localhost:9092")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,LB://localhost:9091")
KafkaConfig.fromProps(props)
// but not duplicate names
- props.put(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "HOST://localhost:9091,HOST://localhost:9091")
assertBadConfigContainingMessage(props, "Each listener must have a different name")
}
@Test
def testControlPlaneListenerName(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
- props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
- props.put("control.plane.listener.name", "CONTROLLER")
+ props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+ props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.setProperty("control.plane.listener.name", "CONTROLLER")
KafkaConfig.fromProps(props)
val serverConfig = KafkaConfig.fromProps(props)
@@ -234,12 +234,12 @@ class KafkaConfigTest {
@Test
def testControllerListenerNames(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:5000")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
val serverConfig = KafkaConfig.fromProps(props)
val controllerEndpoints = serverConfig.controllerListeners
@@ -253,12 +253,12 @@ class KafkaConfigTest {
@Test
def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
- props.put(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "SSL")
assertFalse(isValidKafkaConfig(props))
assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")
@@ -270,119 +270,119 @@ class KafkaConfigTest {
@Test
def testControllerListenerDefinedForKRaftController(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "controller")
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertBadConfigContainingMessage(props, "The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
KafkaConfig.fromProps(props)
// confirm that redirecting via listener.security.protocol.map is acceptable
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
- props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:SSL")
+ props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9093")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
KafkaConfig.fromProps(props)
}
@Test
def testControllerListenerDefinedForKRaftBroker(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
assertBadConfigContainingMessage(props, "controller.listener.names must contain at least one value when running KRaft with just the broker role")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
KafkaConfig.fromProps(props)
// confirm that redirecting via listener.security.protocol.map is acceptable
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
KafkaConfig.fromProps(props)
}
@Test
def testPortInQuorumVotersNotRequiredToMatchFirstControllerListenerPortForThisKRaftController(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "controller,broker")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller,broker")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:9094")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL,SASL_SSL")
KafkaConfig.fromProps(props)
// change each of the 4 ports to port 5555 -- should pass in all circumstances since we can't validate the
// controller.quorum.voters ports (which are the ports that clients use and are semantically "advertised" ports
// even though the controller configuration doesn't list them in advertised.listeners) against the
// listener ports (which are semantically different then the ports that clients use).
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:5555,SASL_SSL://localhost:9094")
KafkaConfig.fromProps(props)
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:5555")
KafkaConfig.fromProps(props)
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094") // reset to original value
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:5555,3@anotherhost:9094")
KafkaConfig.fromProps(props)
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093,3@anotherhost:5555")
KafkaConfig.fromProps(props)
}
@Test
def testSeparateControllerListenerDefinedForKRaftBrokerController(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:9093")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9093")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
assertBadConfigContainingMessage(props, "There must be at least one advertised listener. Perhaps all listeners appear in controller.listener.names?")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
KafkaConfig.fromProps(props)
// confirm that redirecting via listener.security.protocol.map is acceptable
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
KafkaConfig.fromProps(props)
}
@Test
def testControllerListenerNameMapsToPlaintextByDefaultForKRaft(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
- props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
val controllerListenerName = new ListenerName("CONTROLLER")
assertEquals(Some(SecurityProtocol.PLAINTEXT),
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
// ensure we don't map it to PLAINTEXT when there is a SSL or SASL controller listener
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER,SSL")
val controllerNotFoundInMapMessage = "Controller listener with name CONTROLLER defined in controller.listener.names not found in listener.security.protocol.map"
assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
// ensure we don't map it to PLAINTEXT when there is a SSL or SASL listener
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:9092")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9092")
assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
props.remove(KafkaConfig.ListenersProp)
// ensure we don't map it to PLAINTEXT when it is explicitly mapped otherwise
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
assertEquals(Some(SecurityProtocol.SSL),
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(controllerListenerName))
// ensure we don't map it to PLAINTEXT when anything is explicitly given
// (i.e. it is only part of the default value, even with KRaft)
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT")
assertBadConfigContainingMessage(props, controllerNotFoundInMapMessage)
// ensure we can map it to a non-PLAINTEXT security protocol by default (i.e. when nothing is given)
props.remove(KafkaConfig.ListenerSecurityProtocolMapProp)
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
assertEquals(Some(SecurityProtocol.SSL),
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("SSL")))
}
@@ -390,11 +390,11 @@ class KafkaConfigTest {
@Test
def testMultipleControllerListenerNamesMapToPlaintextByDefaultForKRaft(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "controller")
- props.put(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
- props.put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
- props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER1://localhost:9092,CONTROLLER2://localhost:9093")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER1,CONTROLLER2")
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "1@localhost:9092")
assertEquals(Some(SecurityProtocol.PLAINTEXT),
KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER1")))
assertEquals(Some(SecurityProtocol.PLAINTEXT),
@@ -404,22 +404,22 @@ class KafkaConfigTest {
@Test
def testControllerListenerNameDoesNotMapToPlaintextByDefaultForNonKRaft(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.ListenersProp, "CONTROLLER://localhost:9092")
assertBadConfigContainingMessage(props,
"Error creating broker listeners from 'CONTROLLER://localhost:9092': No security protocol defined for listener CONTROLLER")
// Valid now
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
assertEquals(None, KafkaConfig.fromProps(props).effectiveListenerSecurityProtocolMap.get(new ListenerName("CONTROLLER")))
}
@Test
def testBadListenerProtocol(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "BAD://localhost:9091")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.ListenersProp, "BAD://localhost:9091")
assertFalse(isValidKafkaConfig(props))
}
@@ -427,12 +427,12 @@ class KafkaConfigTest {
@Test
def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+ props.setProperty(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq(
EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL),
@@ -451,13 +451,13 @@ class KafkaConfigTest {
@Test
def testListenerAndAdvertisedListenerNames(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
- props.put(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+ props.setProperty(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq(
@@ -482,43 +482,43 @@ class KafkaConfigTest {
@Test
def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
- props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
- props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+ props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testCaseInsensitiveListenerProtocol(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
- props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
val config = KafkaConfig.fromProps(props)
assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString))
@@ -531,8 +531,8 @@ class KafkaConfigTest {
@Test
def testListenerDefaults(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
// configuration with no listeners
val conf = KafkaConfig.fromProps(props)
@@ -545,21 +545,21 @@ class KafkaConfigTest {
@Test
def testVersionConfiguration(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.BrokerIdProp, "1")
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.BrokerIdProp, "1")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
val conf = KafkaConfig.fromProps(props)
assertEquals(MetadataVersion.latest, conf.interBrokerProtocolVersion)
- props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
+ props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
// We need to set the message format version to make the configuration valid.
- props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
+ props.setProperty(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
val conf2 = KafkaConfig.fromProps(props)
assertEquals(IBP_0_8_2, conf2.interBrokerProtocolVersion)
// check that 0.8.2.0 is the same as 0.8.2.1
- props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
+ props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
// We need to set the message format version to make the configuration valid
- props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
+ props.setProperty(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(IBP_0_8_2, conf3.interBrokerProtocolVersion)
@@ -587,7 +587,7 @@ class KafkaConfigTest {
@Test
def testUncleanElectionDisabled(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
+ props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
@@ -596,7 +596,7 @@ class KafkaConfigTest {
@Test
def testUncleanElectionEnabled(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
+ props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
@@ -605,7 +605,7 @@ class KafkaConfigTest {
@Test
def testUncleanElectionInvalid(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
+ props.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@@ -613,7 +613,7 @@ class KafkaConfigTest {
@Test
def testLogRollTimeMsProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
+ props.setProperty(KafkaConfig.LogRollTimeMillisProp, "1800000")
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
@@ -622,8 +622,8 @@ class KafkaConfigTest {
@Test
def testLogRollTimeBothMsAndHoursProvided(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
- props.put(KafkaConfig.LogRollTimeHoursProp, "1")
+ props.setProperty(KafkaConfig.LogRollTimeMillisProp, "1800000")
+ props.setProperty(KafkaConfig.LogRollTimeHoursProp, "1")
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
@@ -648,7 +648,7 @@ class KafkaConfigTest {
@Test
def testValidCompressionType(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put("compression.type", "gzip")
+ props.setProperty("compression.type", "gzip")
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "gzip")
@@ -657,34 +657,34 @@ class KafkaConfigTest {
@Test
def testInvalidCompressionType(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.CompressionTypeProp, "abc")
+ props.setProperty(KafkaConfig.CompressionTypeProp, "abc")
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@Test
def testInvalidInterBrokerSecurityProtocol(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
- props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
+ props.setProperty(KafkaConfig.ListenersProp, "SSL://localhost:0")
+ props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
}
@Test
def testEqualAdvertisedListenersProtocol(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
- props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
KafkaConfig.fromProps(props)
}
@Test
def testInvalidAdvertisedListenersProtocol(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
- props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
+ props.setProperty(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
assertBadConfigContainingMessage(props, "No security protocol defined for listener TRACE")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
+ props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "PLAINTEXT:PLAINTEXT,TRACE:PLAINTEXT,SSL:SSL")
assertBadConfigContainingMessage(props, "advertised.listeners listener names must be equal to or a subset of the ones defined in listeners")
}
@@ -693,8 +693,8 @@ class KafkaConfigTest {
def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
def buildConfig(interBrokerProtocol: MetadataVersion, messageFormat: MetadataVersion): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
- props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+ props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
+ props.setProperty(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
KafkaConfig.fromProps(props)
}
@@ -718,7 +718,7 @@ class KafkaConfigTest {
def testFromPropsInvalid(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
- validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
validRequiredProperties
}
// to ensure a basis is valid - bootstraps all needed validation
@@ -988,7 +988,7 @@ class KafkaConfigTest {
def testDynamicLogConfigs(): Unit = {
def baseProperties: Properties = {
val validRequiredProperties = new Properties()
- validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ validRequiredProperties.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
validRequiredProperties
}
@@ -997,7 +997,7 @@ class KafkaConfigTest {
def assertDynamic(property: String, value: Any, accessor: () => Any): Unit = {
val initial = accessor()
- props.put(property, value.toString)
+ props.setProperty(property, value.toString)
config.updateCurrentConfig(new KafkaConfig(props))
assertNotEquals(initial, accessor())
}
@@ -1069,23 +1069,23 @@ class KafkaConfigTest {
@Test
def testSpecificProperties(): Unit = {
val defaults = new Properties()
- defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ defaults.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
// For ZkConnectionTimeoutMs
- defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
- defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
- defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
- defaults.put(KafkaConfig.BrokerIdProp, "1")
- defaults.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
- defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
- defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
- defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
- defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
- defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
+ defaults.setProperty(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
+ defaults.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ defaults.setProperty(KafkaConfig.MaxReservedBrokerIdProp, "1")
+ defaults.setProperty(KafkaConfig.BrokerIdProp, "1")
+ defaults.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:1122")
+ defaults.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
+ defaults.setProperty(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
+ defaults.setProperty(KafkaConfig.LogRollTimeHoursProp, "12")
+ defaults.setProperty(KafkaConfig.LogRollTimeJitterHoursProp, "11")
+ defaults.setProperty(KafkaConfig.LogRetentionTimeHoursProp, "10")
//For LogFlushIntervalMsProp
- defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
- defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
+ defaults.setProperty(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
+ defaults.setProperty(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
// For MetricRecordingLevelProp
- defaults.put(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString)
+ defaults.setProperty(KafkaConfig.MetricRecordingLevelProp, Sensor.RecordingLevel.DEBUG.toString)
val config = KafkaConfig.fromProps(defaults)
assertEquals("127.0.0.1:2181", config.zkConnect)
@@ -1107,7 +1107,7 @@ class KafkaConfigTest {
assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
assertEquals(1 * 60L * 1000L * 60, config.delegationTokenExpiryCheckIntervalMs)
- defaults.put(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
+ defaults.setProperty(KafkaConfig.DelegationTokenSecretKeyProp, "1234567890")
val config1 = KafkaConfig.fromProps(defaults)
assertEquals(true, config1.tokenAuthEnabled)
}
@@ -1115,19 +1115,19 @@ class KafkaConfigTest {
@Test
def testNonroutableAdvertisedListeners(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
+ props.setProperty(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://0.0.0.0:9092")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testMaxConnectionsPerIpProp(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.MaxConnectionsPerIpProp, "0")
+ props.setProperty(KafkaConfig.MaxConnectionsPerIpProp, "0")
assertFalse(isValidKafkaConfig(props))
- props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
+ props.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
KafkaConfig.fromProps(props)
- props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
+ props.setProperty(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
assertFalse(isValidKafkaConfig(props))
}
@@ -1145,19 +1145,19 @@ class KafkaConfigTest {
@Test
def testDistinctControllerAndAdvertisedListenersAllowedForKRaftBroker(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
- props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
- props.put(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://A:9092,SSL://B:9093") // explicitly setting it in KRaft
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SASL_SSL")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "3@localhost:9094")
// invalid due to extra listener also appearing in controller listeners
assertBadConfigContainingMessage(props,
"controller.listener.names must not contain a value appearing in the 'listeners' configuration when running KRaft with just the broker role")
// Valid now
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://A:9092,SSL://B:9093")
KafkaConfig.fromProps(props)
// Also valid if we let advertised listeners be derived from listeners/controller.listener.names
@@ -1169,19 +1169,19 @@ class KafkaConfigTest {
@Test
def testControllerListenersCannotBeAdvertisedForKRaftBroker(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker,controller")
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
- props.put(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
- props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+ props.setProperty(KafkaConfig.ListenersProp, listeners)
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
+ props.setProperty(KafkaConfig.InterBrokerListenerNameProp, "SASL_SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
assertBadConfigContainingMessage(props,
"The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when process.roles contains the broker role")
// Valid now
- props.put(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, "SASL_SSL://C:9094")
KafkaConfig.fromProps(props)
// Also valid if we allow advertised listeners to derive from listeners/controller.listener.names
@@ -1191,28 +1191,44 @@ class KafkaConfigTest {
@Test
def testAdvertisedListenersDisallowedForKRaftControllerOnlyRole(): Unit = {
- val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "controller")
- val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners) // explicitly setting it in KRaft
- props.put(KafkaConfig.ControllerListenerNamesProp, "PLAINTEXT,SSL")
- props.put(KafkaConfig.NodeIdProp, "2")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
- val expectedExceptionContainsTextSuffix = " config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller"
- assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
-
- // Still invalid due to extra listener if we set advertised listeners explicitly to be correct
+ // Test that advertised listeners cannot be set when KRaft and server is controller only.
+ // Test that listeners must enumerate every controller listener
+ // Test that controller listener must enumerate every listener
val correctListeners = "PLAINTEXT://A:9092,SSL://B:9093"
- props.put(KafkaConfig.AdvertisedListenersProp, correctListeners)
- assertBadConfigContainingMessage(props, "The advertised.listeners" + expectedExceptionContainsTextSuffix)
+ val incorrectListeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- // Still invalid due to extra listener if we allow advertised listeners to derive from listeners/controller.listener.names
+ val correctControllerListenerNames = "PLAINTEXT,SSL"
+
+ val props = new Properties()
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
+ props.setProperty(KafkaConfig.ListenersProp, correctListeners)
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, incorrectListeners)
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames)
+ props.setProperty(KafkaConfig.NodeIdProp, "2")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9092")
+ var expectedExceptionContainsText = "The advertised.listeners config must be empty when process.roles=controller"
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+ // Invalid if advertised listeners is explicitly to the set
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, correctListeners)
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+ // Invalid if listeners contains names not in controller.listener.names
props.remove(KafkaConfig.AdvertisedListenersProp)
- assertBadConfigContainingMessage(props, "The listeners" + expectedExceptionContainsTextSuffix)
+ props.setProperty(KafkaConfig.ListenersProp, incorrectListeners)
+ expectedExceptionContainsText = """The listeners config must only contain KRaft controller listeners from
+ |controller.listener.names when process.roles=controller""".stripMargin.replaceAll("\n", " ")
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
+
+ // Invalid if listeners doesn't contain every name in controller.listener.names
+ props.setProperty(KafkaConfig.ListenersProp, correctListeners)
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames + ",SASL_SSL")
+ expectedExceptionContainsText = """controller.listener.names must only contain values appearing in the 'listeners'
+ |configuration when running the KRaft controller role""".stripMargin.replaceAll("\n", " ")
+ assertBadConfigContainingMessage(props, expectedExceptionContainsText)
// Valid now
- props.put(KafkaConfig.ListenersProp, correctListeners)
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, correctControllerListenerNames)
KafkaConfig.fromProps(props)
}
@@ -1244,7 +1260,7 @@ class KafkaConfigTest {
private def assertInvalidQuorumVoters(value: String): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put(KafkaConfig.QuorumVotersProp, value)
+ props.setProperty(KafkaConfig.QuorumVotersProp, value)
assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
}
@@ -1269,7 +1285,7 @@ class KafkaConfigTest {
private def assertValidQuorumVoters(value: String, expectedVoters: util.Map[Integer, AddressSpec]): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
- props.put(KafkaConfig.QuorumVotersProp, value)
+ props.setProperty(KafkaConfig.QuorumVotersProp, value)
val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
}
@@ -1281,11 +1297,11 @@ class KafkaConfigTest {
// and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
val largeBrokerId = 2000
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
- props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.NodeIdProp, largeBrokerId.toString)
KafkaConfig.fromProps(props)
}
@@ -1293,7 +1309,7 @@ class KafkaConfigTest {
def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
assertFalse(isValidKafkaConfig(props))
}
@@ -1301,7 +1317,7 @@ class KafkaConfigTest {
def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "controller")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "controller")
assertFalse(isValidKafkaConfig(props))
}
@@ -1309,9 +1325,9 @@ class KafkaConfigTest {
def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
// -1 is the default for both node.id and broker.id
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
assertFalse(isValidKafkaConfig(props))
}
@@ -1323,8 +1339,8 @@ class KafkaConfigTest {
val largeBrokerId = 2000
val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.setProperty(KafkaConfig.ListenersProp, listeners)
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
assertFalse(isValidKafkaConfig(props))
}
@@ -1333,8 +1349,8 @@ class KafkaConfigTest {
// -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed
val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.setProperty(KafkaConfig.ListenersProp, listeners)
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
KafkaConfig.fromProps(props)
}
@@ -1344,10 +1360,10 @@ class KafkaConfigTest {
val negativeTwoNodeId = -2
val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners)
- props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
- props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
+ props.setProperty(KafkaConfig.ListenersProp, listeners)
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.setProperty(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
+ props.setProperty(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
assertFalse(isValidKafkaConfig(props))
}
@@ -1358,9 +1374,9 @@ class KafkaConfigTest {
val largeBrokerId = 2000
val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.AdvertisedListenersProp, listeners)
- props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ props.setProperty(KafkaConfig.ListenersProp, listeners)
+ props.setProperty(KafkaConfig.AdvertisedListenersProp, listeners)
+ props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
KafkaConfig.fromProps(props)
}
@@ -1369,27 +1385,27 @@ class KafkaConfigTest {
// -1 is the default for both node.id and broker.id
val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
- props.put(KafkaConfig.ListenersProp, listeners)
- props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+ props.setProperty(KafkaConfig.ListenersProp, listeners)
+ props.setProperty(KafkaConfig.BrokerIdGenerationEnableProp, "false")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
assertFalse(isValidKafkaConfig(props))
}
@Test
def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
KafkaConfig.fromProps(props)
}
@@ -1399,12 +1415,12 @@ class KafkaConfigTest {
val dataDir = "/path/to/data/dir"
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- props.put(KafkaConfig.MetadataLogDirProp, metadataDir)
- props.put(KafkaConfig.LogDirProp, dataDir)
- props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.MetadataLogDirProp, metadataDir)
+ props.setProperty(KafkaConfig.LogDirProp, dataDir)
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
KafkaConfig.fromProps(props)
val config = KafkaConfig.fromProps(props)
@@ -1418,11 +1434,11 @@ class KafkaConfigTest {
val dataDir2 = "/path/to/data/dir/2"
val props = new Properties()
- props.put(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
- props.put(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
- props.put(KafkaConfig.NodeIdProp, "1")
- props.put(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
+ props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.LogDirProp, s"$dataDir1,$dataDir2")
+ props.setProperty(KafkaConfig.NodeIdProp, "1")
+ props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
KafkaConfig.fromProps(props)
val config = KafkaConfig.fromProps(props)
@@ -1478,7 +1494,7 @@ class KafkaConfigTest {
def testNodeIdIsInferredByBrokerIdWithKraft(): Unit = {
val props = new Properties()
props.setProperty(KafkaConfig.ProcessRolesProp, "broker")
- props.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ props.setProperty(KafkaConfig.ControllerListenerNamesProp, "SSL")
props.setProperty(KafkaConfig.BrokerIdProp, "3")
props.setProperty(KafkaConfig.QuorumVotersProp, "2@localhost:9093")
val config = KafkaConfig.fromProps(props)
@@ -1500,7 +1516,7 @@ class KafkaConfigTest {
@Test
def testBrokerIdIsInferredByNodeIdWithKraft(): Unit = {
- val props = new Properties(kraftProps())
+ val props = new Properties()
props.putAll(kraftProps())
val config = KafkaConfig.fromProps(props)
assertEquals(3, config.brokerId)
@@ -1513,7 +1529,7 @@ class KafkaConfigTest {
@Test
def testSaslJwksEndpointRetryDefaults(): Unit = {
val props = new Properties()
- props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+ props.setProperty(KafkaConfig.ZkConnectProp, "localhost:2181")
val config = KafkaConfig.fromProps(props)
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMsProp))
assertNotNull(config.getLong(KafkaConfig.SaslOAuthBearerJwksEndpointRetryBackoffMaxMsProp))
@@ -1531,7 +1547,7 @@ class KafkaConfigTest {
@Test
def testInvalidSecurityInterBrokerProtocol(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
- props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "abc")
+ props.setProperty(KafkaConfig.InterBrokerSecurityProtocolProp, "abc")
val ce = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
assertTrue(ce.getMessage.contains(KafkaConfig.InterBrokerSecurityProtocolProp))
}
@@ -1597,7 +1613,7 @@ class KafkaConfigTest {
def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
val props = new Properties()
props.putAll(kraftProps())
- val config = new KafkaConfig(props)
+ val config = KafkaConfig.fromProps(props)
assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 1e0d5981dac..a12da68f66c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2028,7 +2028,10 @@ object TestUtils extends Logging {
fail("Expected illegal configuration but instead it was legal")
} catch {
case caught @ (_: ConfigException | _: IllegalArgumentException) =>
- assertTrue(caught.getMessage.contains(expectedExceptionContainsText))
+ assertTrue(
+ caught.getMessage.contains(expectedExceptionContainsText),
+ s""""${caught.getMessage}" doesn't contain "$expectedExceptionContainsText""""
+ )
}
}
@@ -2312,4 +2315,4 @@ object TestUtils extends Logging {
timedOut.set(true)
}
}
-}
\ No newline at end of file
+}