You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/04 20:31:20 UTC
[kafka] 01/02: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4e049c706fa741164bc5ad65768236a48f75e64d
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Aug 3 13:28:06 2022 -0400
KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
Enable some of the dynamic broker reconfiguration tests in KRaft mode
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 +-
core/src/main/scala/kafka/server/KafkaBroker.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../main/scala/kafka/utils/PasswordEncoder.scala | 45 ++++++--
.../server/DynamicBrokerReconfigurationTest.scala | 128 +++++++++++++--------
.../unit/kafka/utils/PasswordEncoderTest.scala | 10 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++--
8 files changed, 145 insertions(+), 76 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 5e5ccefa45..126be3a81a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -212,7 +212,7 @@ object ConfigCommand extends Config {
encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
throw new IllegalArgumentException("Password encoder secret not specified"))
- new PasswordEncoder(new Password(encoderSecret),
+ PasswordEncoder.encrypting(new Password(encoderSecret),
None,
encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2a4fd9501b..a40444507b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -209,7 +209,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
- private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+ private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
+ maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+ } else {
+ Some(PasswordEncoder.noop())
+ }
private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
@@ -338,7 +342,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
- new PasswordEncoder(secret,
+ PasswordEncoder.encrypting(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index f4c6abc306..d4210b55f4 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -88,6 +88,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
def shutdown(): Unit
def brokerTopicStats: BrokerTopicStats
def credentialProvider: CredentialProvider
+ def clientToControllerChannelManager: BrokerToControllerChannelManager
// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3282ef6c57..be2c8e72b9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1474,6 +1474,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
+ val processRoles: Set[ProcessRole] = parseProcessRoles()
private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
@@ -1593,7 +1594,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
- val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index f748a455c6..3373223e36 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -38,6 +38,33 @@ object PasswordEncoder {
val IterationsProp = "iterations"
val EncyrptedPasswordProp = "encryptedPassword"
val PasswordLengthProp = "passwordLength"
+
+ def encrypting(secret: Password,
+ keyFactoryAlgorithm: Option[String],
+ cipherAlgorithm: String,
+ keyLength: Int,
+ iterations: Int): EncryptingPasswordEncoder = {
+ new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
+ }
+
+ def noop(): NoOpPasswordEncoder = {
+ new NoOpPasswordEncoder()
+ }
+}
+
+trait PasswordEncoder {
+ def encode(password: Password): String
+ def decode(encodedPassword: String): Password
+
+ private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
+}
+
+/**
+ * A password encoder that does not modify the given password. This is used in KRaft mode only.
+ */
+class NoOpPasswordEncoder extends PasswordEncoder {
+ override def encode(password: Password): String = password.value()
+ override def decode(encodedPassword: String): Password = new Password(encodedPassword)
}
/**
@@ -55,16 +82,18 @@ object PasswordEncoder {
* The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
*
*/
-class PasswordEncoder(secret: Password,
- keyFactoryAlgorithm: Option[String],
- cipherAlgorithm: String,
- keyLength: Int,
- iterations: Int) extends Logging {
+class EncryptingPasswordEncoder(
+ secret: Password,
+ keyFactoryAlgorithm: Option[String],
+ cipherAlgorithm: String,
+ keyLength: Int,
+ iterations: Int
+) extends PasswordEncoder with Logging {
private val secureRandom = new SecureRandom
private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
- def encode(password: Password): String = {
+ override def encode(password: Password): String = {
val salt = new Array[Byte](256)
secureRandom.nextBytes(salt)
val cipher = Cipher.getInstance(cipherAlgorithm)
@@ -84,7 +113,7 @@ class PasswordEncoder(secret: Password,
encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
}
- def decode(encodedPassword: String): Password = {
+ override def decode(encodedPassword: String): Password = {
val params = CoreUtils.parseCsvMap(encodedPassword)
val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
val cipherAlg = params(CipherAlgorithmProp)
@@ -131,8 +160,6 @@ class PasswordEncoder(secret: Password,
private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
- private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
-
private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
val aesPattern = "AES/(.*)/.*".r
cipherAlgorithm match {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index dfff9075f8..c3d1c68c71 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -31,7 +31,7 @@ import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
-import kafka.log.{CleanerConfig, LogConfig}
+import kafka.log.{CleanerConfig, LogConfig, UnifiedLog}
import kafka.message.ProducerCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.network.{Processor, RequestChannel}
@@ -64,6 +64,8 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.collection._
@@ -80,7 +82,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
import DynamicBrokerReconfigurationTest._
- private val servers = new ArrayBuffer[KafkaServer]
+ private val servers = new ArrayBuffer[KafkaBroker]
private val numServers = 3
private val numPartitions = 10
private val producers = new ArrayBuffer[KafkaProducer[String, String]]
@@ -111,15 +113,22 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
(0 until numServers).foreach { brokerId =>
- val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+ val props = if (isKRaftTest()) {
+ val properties = TestUtils.createBrokerConfig(brokerId, null)
+ properties.put(KafkaConfig.AdvertisedListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
+ properties
+ } else {
+ val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
+ properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ properties
+ }
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
// Ensure that we can support multiple listeners per security protocol and multiple security protocols
props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol")
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
props.put(KafkaConfig.SslClientAuthProp, "requested")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
- props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update
props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads
@@ -138,17 +147,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
val kafkaConfig = KafkaConfig.fromProps(props)
- configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+ if (!isKRaftTest()) {
+ configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+ }
- servers += TestUtils.createServer(kafkaConfig)
+ servers += createBroker(kafkaConfig)
}
- TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
- TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
- replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
-
createAdminClient(SecurityProtocol.SSL, SecureInternal)
+ TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers)
+ TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers,
+ numPartitions = servers.head.config.offsetsTopicPartitions,
+ replicationFactor = numServers,
+ topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
+
TestMetricsReporter.testReporters.clear()
}
@@ -166,8 +179,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
closeSasl()
}
- @Test
- def testConfigDescribeUsingAdminClient(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConfigDescribeUsingAdminClient(quorum: String): Unit = {
def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean,
expectedProps: Properties): Unit = {
@@ -226,9 +240,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val adminClient = adminClients.head
alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
- val configDesc = describeConfig(adminClient)
- verifySslConfig("listener.name.external.", sslProperties1, configDesc)
- verifySslConfig("", invalidSslProperties, configDesc)
+ val configDesc = TestUtils.tryUntilNoAssertionError() {
+ val describeConfigsResult = describeConfig(adminClient)
+ verifySslConfig("listener.name.external.", sslProperties1, describeConfigsResult)
+ verifySslConfig("", invalidSslProperties, describeConfigsResult)
+ describeConfigsResult
+ }
// Verify a few log configs with and without synonyms
val expectedProps = new Properties
@@ -262,8 +279,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
}
- @Test
- def testUpdatesUsingConfigProvider(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUpdatesUsingConfigProvider(quorum: String): Unit = {
val PollingIntervalVal = f"$${file:polling.interval:interval}"
val PollingIntervalUpdateVal = f"$${file:polling.interval:updinterval}"
val SslTruststoreTypeVal = f"$${file:ssl.truststore.type:storetype}"
@@ -309,11 +327,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
}
- // fetch from ZK, values should be unresolved
- val props = fetchBrokerConfigsFromZooKeeper(servers.head)
- assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
- assertTrue(props.getProperty(configPrefix+KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
- assertTrue(props.getProperty(configPrefix+KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+ if (!isKRaftTest()) {
+ // fetch from ZK, values should be unresolved
+ val props = fetchBrokerConfigsFromZooKeeper(servers.head)
+ assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
+ assertTrue(props.getProperty(configPrefix + KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
+ assertTrue(props.getProperty(configPrefix + KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+ }
// verify the update
// 1. verify update not occurring if the value of property is same.
@@ -332,10 +352,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
- @Test
+ @Test // TODO KAFKA-14126 add KRaft support
def testKeyStoreAlter(): Unit = {
val topic2 = "testtopic2"
- TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers)
+ TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
// Start a producer and consumer that work with the current broker keystore.
// This should continue working while changes are made
@@ -399,7 +419,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
- @Test
+ @Test // TODO KAFKA-14126 add KRaft support
def testTrustStoreAlter(): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
@@ -481,7 +501,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifySslProduceConsume(sslProperties2, "alter-truststore-7")
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
- val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
@@ -492,8 +512,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyBrokerToControllerCall(controller)
}
- @Test
- def testLogCleanerConfig(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLogCleanerConfig(quorum: String): Unit = {
val (producerThread, consumerThread) = startProduceConsume(retries = 0)
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
@@ -537,13 +558,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
- @Test
- def testConsecutiveConfigChange(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConsecutiveConfigChange(quorum: String): Unit = {
val topic2 = "testtopic2"
val topicProps = new Properties
topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
- TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps)
- var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+ TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
+
+ def getLogOrThrow(tp: TopicPartition): UnifiedLog = {
+ var (logOpt, found) = TestUtils.computeUntilTrue {
+ servers.head.logManager.getLog(tp)
+ }(_.isDefined)
+ assertTrue(found, "Log not found")
+ logOpt.get
+ }
+
+ var log = getLogOrThrow(new TopicPartition(topic2, 0))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
@@ -558,7 +589,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
- log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+ log = getLogOrThrow(new TopicPartition(topic2, 0))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives
@@ -566,7 +597,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.clear()
props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000"))
- log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+ log = getLogOrThrow(new TopicPartition(topic2, 0))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives
}
@@ -974,6 +1005,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@Test
+ // Modifying advertised listeners is not supported in KRaft
def testAdvertisedListenerUpdate(): Unit = {
val adminClient = adminClients.head
val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
@@ -994,11 +1026,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
// Verify that endpoints have been updated in ZK for all brokers
- servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost)))
+ servers.foreach { server =>
+ validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => endpoints.contains(invalidHost))
+ }
// Trigger session expiry and ensure that controller registers new advertised listener after expiry
val controllerEpoch = zkClient.getControllerEpoch
- val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller")))
+ val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))).asInstanceOf[KafkaServer]
val controllerZkClient = controllerServer.zkClient
val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper)
sessionExpiringClient.close()
@@ -1022,7 +1056,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
- servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
+ servers.foreach { server =>
+ validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => !endpoints.contains(invalidHost))
+ }
// Verify that produce/consume work now
val topic2 = "testtopic2"
@@ -1119,7 +1155,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader")
}
- private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
+ private def addListener(servers: Seq[KafkaBroker], listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config
val existingListenerCount = config.listeners.size
@@ -1264,11 +1300,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
}
- private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+ private def hasListenerMetric(server: KafkaBroker, listenerName: String): Boolean = {
server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
}
- private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
+ private def fetchBrokerConfigsFromZooKeeper(server: KafkaBroker): Properties = {
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)
}
@@ -1322,7 +1358,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}, "Did not fail authentication with invalid config")
}
- private def describeConfig(adminClient: Admin, servers: Seq[KafkaServer] = this.servers): Config = {
+ private def describeConfig(adminClient: Admin, servers: Seq[KafkaBroker] = this.servers): Config = {
val configResources = servers.map { server =>
new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
}
@@ -1419,7 +1455,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@nowarn("cat=deprecation")
- private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
+ private def alterConfigsOnServer(server: KafkaBroker, props: Properties): Unit = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) -> newConfig).asJava
@@ -1428,7 +1464,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@nowarn("cat=deprecation")
- private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
+ private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, props: Properties,
perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
@@ -1507,7 +1543,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = {
val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
- new PasswordEncoder(encoderSecret,
+ PasswordEncoder.encrypting(encoderSecret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
@@ -1518,7 +1554,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
}
- private def waitForConfigOnServer(server: KafkaServer, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
+ private def waitForConfigOnServer(server: KafkaBroker, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
TestUtils.retry(maxWaitMs) {
assertEquals(propValue, server.config.originals.get(propName))
}
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
index 0a5d5ac029..50cdceabbc 100755
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -30,7 +30,7 @@ class PasswordEncoderTest {
@Test
def testEncodeDecode(): Unit = {
- val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
None,
Defaults.PasswordEncoderCipherAlgorithm,
Defaults.PasswordEncoderKeyLength,
@@ -54,7 +54,7 @@ class PasswordEncoderTest {
@Test
def testEncoderConfigChange(): Unit = {
- val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
Some("PBKDF2WithHmacSHA1"),
"DES/CBC/PKCS5Padding",
64,
@@ -68,7 +68,7 @@ class PasswordEncoderTest {
assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
// Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
- val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
Some("PBKDF2WithHmacSHA1"),
"AES/CBC/PKCS5Padding",
128,
@@ -76,7 +76,7 @@ class PasswordEncoderTest {
assertEquals(password, decoder.decode(encoded).value)
// Test that decoding fails if secret is altered
- val decoder2 = new PasswordEncoder(new Password("secret-2"),
+ val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
Some("PBKDF2WithHmacSHA1"),
"AES/CBC/PKCS5Padding",
128,
@@ -92,7 +92,7 @@ class PasswordEncoderTest {
def testEncodeDecodeAlgorithms(): Unit = {
def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
- val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
keyFactoryAlg,
cipherAlg,
keyLength,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8d921e54f1..e388896c03 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -482,7 +482,7 @@ object TestUtils extends Logging {
topic: String,
numPartitions: Int = 1,
replicationFactor: Int = 1,
- servers: Seq[KafkaServer],
+ servers: Seq[KafkaBroker],
topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
@@ -514,7 +514,7 @@ object TestUtils extends Logging {
def createTopic(zkClient: KafkaZkClient,
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
- servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = {
+ servers: Seq[KafkaBroker]): scala.collection.immutable.Map[Int, Int] = {
createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties())
}
@@ -526,7 +526,7 @@ object TestUtils extends Logging {
def createTopic(zkClient: KafkaZkClient,
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
- servers: Seq[KafkaServer],
+ servers: Seq[KafkaBroker],
topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
@@ -554,7 +554,7 @@ object TestUtils extends Logging {
* Create the consumer offsets/group metadata topic and wait until the leader is elected and metadata is propagated
* to all brokers.
*/
- def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaServer]): Unit = {
+ def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
val server = servers.head
createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
@@ -1014,18 +1014,19 @@ object TestUtils extends Logging {
* otherwise difficult to poll for. `computeUntilTrue` and `waitUntilTrue` should be preferred in cases where we can
* easily wait on a condition before evaluating the assertions.
*/
- def tryUntilNoAssertionError(waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => Unit) = {
- val (error, success) = TestUtils.computeUntilTrue({
+ def tryUntilNoAssertionError[T](waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => T): T = {
+ val (either, success) = TestUtils.computeUntilTrue({
try {
- assertions
- None
+ val res = assertions
+ Left(res)
} catch {
- case ae: AssertionError => Some(ae)
+ case ae: AssertionError => Right(ae)
}
- }, waitTime = waitTime, pause = pause)(_.isEmpty)
+ }, waitTime = waitTime, pause = pause)(_.isLeft)
- if (!success) {
- throw error.get
+ either match {
+ case Left(res) => res
+ case Right(err) => throw err
}
}