You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/04 20:31:19 UTC
[kafka] branch 3.2 updated (89b2bf257b -> a7369bd52f)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a change to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
from 89b2bf257b MINOR: Update 3.2 branch to 3.2.2-SNAPSHOT
new 4e049c706f KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
new a7369bd52f KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 +-
core/src/main/scala/kafka/server/KafkaBroker.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../main/scala/kafka/utils/PasswordEncoder.scala | 45 ++++--
.../server/DynamicBrokerReconfigurationTest.scala | 174 +++++++++++++--------
.../unit/kafka/utils/PasswordEncoderTest.scala | 10 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 +--
.../controller/ConfigurationControlManager.java | 13 +-
9 files changed, 185 insertions(+), 95 deletions(-)
[kafka] 02/02: KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a7369bd52ff1e91b3a56d5622ce49c7c515cb81e
Author: David Arthur <mu...@gmail.com>
AuthorDate: Thu Aug 4 15:09:08 2022 -0400
KAFKA-14136 Generate ConfigRecord for brokers even if the value is unchanged (#12483)
---
.../server/DynamicBrokerReconfigurationTest.scala | 54 ++++++++++++++--------
.../controller/ConfigurationControlManager.java | 13 ++++--
2 files changed, 44 insertions(+), 23 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index c3d1c68c71..a76fa91381 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -67,6 +67,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
@@ -352,8 +353,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
- @Test // TODO KAFKA-14126 add KRaft support
- def testKeyStoreAlter(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testKeyStoreAlter(quorum: String): Unit = {
val topic2 = "testtopic2"
TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
@@ -419,8 +421,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
- @Test // TODO KAFKA-14126 add KRaft support
- def testTrustStoreAlter(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTrustStoreAlter(quorum: String): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
// Producer with new keystore should fail to connect before truststore update
@@ -467,9 +470,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(response.wasDisconnected(), "Request failed because broker is not available")
}
+ val group_id = new AtomicInteger(1)
+ def next_group_name(): String = s"alter-truststore-${group_id.getAndIncrement()}"
+
// Produce/consume should work with old as well as new client keystore
- verifySslProduceConsume(sslProperties1, "alter-truststore-1")
- verifySslProduceConsume(sslProperties2, "alter-truststore-2")
+ verifySslProduceConsume(sslProperties1, next_group_name())
+ verifySslProduceConsume(sslProperties2, next_group_name())
// Revert to old truststore with only one certificate and update. Clients should connect only with old keystore.
val oldTruststoreProps = new Properties
@@ -478,7 +484,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
reconfigureServers(oldTruststoreProps, perBrokerConfig = true,
(s"$prefix$SSL_TRUSTSTORE_LOCATION_CONFIG", sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)))
verifyAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties2).build())
- verifySslProduceConsume(sslProperties1, "alter-truststore-3")
+ verifySslProduceConsume(sslProperties1, next_group_name())
// Update same truststore file to contain both certificates without changing any configs.
// Clients should connect successfully with either keystore after admin client AlterConfigsRequest completes.
@@ -486,8 +492,14 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
Paths.get(sslProperties1.getProperty(SSL_TRUSTSTORE_LOCATION_CONFIG)),
StandardCopyOption.REPLACE_EXISTING)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, oldTruststoreProps, perBrokerConfig = true).all.get()
- verifySslProduceConsume(sslProperties1, "alter-truststore-4")
- verifySslProduceConsume(sslProperties2, "alter-truststore-5")
+ TestUtils.retry(30000) {
+ try {
+ verifySslProduceConsume(sslProperties1, next_group_name())
+ verifySslProduceConsume(sslProperties2, next_group_name())
+ } catch {
+ case t: Throwable => throw new AssertionError(t)
+ }
+ }
// Update internal keystore/truststore and validate new client connections from broker (e.g. controller).
// Alter internal keystore from `sslProperties1` to `sslProperties2`, force disconnect of a controller connection
@@ -495,21 +507,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val props2 = securityProps(sslProperties2, KEYSTORE_PROPS, prefix)
props2 ++= securityProps(combinedStoreProps, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
- verifySslProduceConsume(sslProperties2, "alter-truststore-6")
+ verifySslProduceConsume(sslProperties2, next_group_name())
props2 ++= securityProps(sslProperties2, TRUSTSTORE_PROPS, prefix)
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
- verifySslProduceConsume(sslProperties2, "alter-truststore-7")
+ verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
- val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
- val controllerChannelManager = controller.kafkaController.controllerChannelManager
- val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
- JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
- brokerStateInfo(0).networkClient.disconnect("0")
- TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
-
- // validate that the brokerToController request works fine
- verifyBrokerToControllerCall(controller)
+ if (!isKRaftTest()) {
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
+ val controllerChannelManager = controller.kafkaController.controllerChannelManager
+ val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
+ JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
+ brokerStateInfo(0).networkClient.disconnect("0")
+ TestUtils.createTopic(zkClient, "testtopic2", numPartitions, replicationFactor = numServers, servers)
+
+ // validate that the brokerToController request works fine
+ verifyBrokerToControllerCall(controller)
+ }
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index a16361343b..746fdf1ffe 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
@@ -146,7 +147,8 @@ public class ConfigurationControlManager {
}
break;
}
- if (!Objects.equals(currentValue, newValue)) {
+ if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+ // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
@@ -233,7 +235,8 @@ public class ConfigurationControlManager {
String key = entry.getKey();
String newValue = entry.getValue();
String currentValue = currentConfigs.get(key);
- if (!Objects.equals(newValue, currentValue)) {
+ if (!Objects.equals(currentValue, newValue) || configResource.type().equals(Type.BROKER)) {
+ // KAFKA-14136 We need to generate records even if the value is unchanged to trigger reloads on the brokers
newRecords.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(configResource.type().id()).
setResourceName(configResource.name()).
@@ -297,7 +300,11 @@ public class ConfigurationControlManager {
if (configs.isEmpty()) {
configData.remove(configResource);
}
- log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+ if (configSchema.isSensitive(record)) {
+ log.info("{}: set configuration {} to {}", configResource, record.name(), Password.HIDDEN);
+ } else {
+ log.info("{}: set configuration {} to {}", configResource, record.name(), record.value());
+ }
}
// VisibleForTesting
[kafka] 01/02: KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4e049c706fa741164bc5ad65768236a48f75e64d
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Aug 3 13:28:06 2022 -0400
KAFKA-14111 Fix sensitive dynamic broker configs in KRaft (#12455)
Enable some of the dynamic broker reconfiguration tests in KRaft mode
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 8 +-
core/src/main/scala/kafka/server/KafkaBroker.scala | 1 +
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../main/scala/kafka/utils/PasswordEncoder.scala | 45 ++++++--
.../server/DynamicBrokerReconfigurationTest.scala | 128 +++++++++++++--------
.../unit/kafka/utils/PasswordEncoderTest.scala | 10 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++--
8 files changed, 145 insertions(+), 76 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 5e5ccefa45..126be3a81a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -212,7 +212,7 @@ object ConfigCommand extends Config {
encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
throw new IllegalArgumentException("Password encoder secret not specified"))
- new PasswordEncoder(new Password(encoderSecret),
+ PasswordEncoder.encrypting(new Password(encoderSecret),
None,
encoderConfigs.get(KafkaConfig.PasswordEncoderCipherAlgorithmProp).getOrElse(Defaults.PasswordEncoderCipherAlgorithm),
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PasswordEncoderKeyLength),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2a4fd9501b..a40444507b 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -209,7 +209,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
- private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+ private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
+ maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
+ } else {
+ Some(PasswordEncoder.noop())
+ }
private[server] def initialize(zkClientOpt: Option[KafkaZkClient]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
@@ -338,7 +342,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
- new PasswordEncoder(secret,
+ PasswordEncoder.encrypting(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index f4c6abc306..d4210b55f4 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -88,6 +88,7 @@ trait KafkaBroker extends KafkaMetricsGroup {
def shutdown(): Unit
def brokerTopicStats: BrokerTopicStats
def credentialProvider: CredentialProvider
+ def clientToControllerChannelManager: BrokerToControllerChannelManager
// For backwards compatibility, we need to keep older metrics tied
// to their original name when this class was named `KafkaServer`
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3282ef6c57..be2c8e72b9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1474,6 +1474,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
+ val processRoles: Set[ProcessRole] = parseProcessRoles()
private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
@@ -1593,7 +1594,6 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
val nodeId: Int = getInt(KafkaConfig.NodeIdProp)
- val processRoles: Set[ProcessRole] = parseProcessRoles()
val initialRegistrationTimeoutMs: Int = getInt(KafkaConfig.InitialBrokerRegistrationTimeoutMsProp)
val brokerHeartbeatIntervalMs: Int = getInt(KafkaConfig.BrokerHeartbeatIntervalMsProp)
val brokerSessionTimeoutMs: Int = getInt(KafkaConfig.BrokerSessionTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index f748a455c6..3373223e36 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -38,6 +38,33 @@ object PasswordEncoder {
val IterationsProp = "iterations"
val EncyrptedPasswordProp = "encryptedPassword"
val PasswordLengthProp = "passwordLength"
+
+ def encrypting(secret: Password,
+ keyFactoryAlgorithm: Option[String],
+ cipherAlgorithm: String,
+ keyLength: Int,
+ iterations: Int): EncryptingPasswordEncoder = {
+ new EncryptingPasswordEncoder(secret, keyFactoryAlgorithm, cipherAlgorithm, keyLength, iterations)
+ }
+
+ def noop(): NoOpPasswordEncoder = {
+ new NoOpPasswordEncoder()
+ }
+}
+
+trait PasswordEncoder {
+ def encode(password: Password): String
+ def decode(encodedPassword: String): Password
+
+ private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
+}
+
+/**
+ * A password encoder that does not modify the given password. This is used in KRaft mode only.
+ */
+class NoOpPasswordEncoder extends PasswordEncoder {
+ override def encode(password: Password): String = password.value()
+ override def decode(encodedPassword: String): Password = new Password(encodedPassword)
}
/**
@@ -55,16 +82,18 @@ object PasswordEncoder {
* The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
*
*/
-class PasswordEncoder(secret: Password,
- keyFactoryAlgorithm: Option[String],
- cipherAlgorithm: String,
- keyLength: Int,
- iterations: Int) extends Logging {
+class EncryptingPasswordEncoder(
+ secret: Password,
+ keyFactoryAlgorithm: Option[String],
+ cipherAlgorithm: String,
+ keyLength: Int,
+ iterations: Int
+) extends PasswordEncoder with Logging {
private val secureRandom = new SecureRandom
private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
- def encode(password: Password): String = {
+ override def encode(password: Password): String = {
val salt = new Array[Byte](256)
secureRandom.nextBytes(salt)
val cipher = Cipher.getInstance(cipherAlgorithm)
@@ -84,7 +113,7 @@ class PasswordEncoder(secret: Password,
encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
}
- def decode(encodedPassword: String): Password = {
+ override def decode(encodedPassword: String): Password = {
val params = CoreUtils.parseCsvMap(encodedPassword)
val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
val cipherAlg = params(CipherAlgorithmProp)
@@ -131,8 +160,6 @@ class PasswordEncoder(secret: Password,
private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
- private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
-
private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
val aesPattern = "AES/(.*)/.*".r
cipherAlgorithm match {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index dfff9075f8..c3d1c68c71 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -31,7 +31,7 @@ import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
-import kafka.log.{CleanerConfig, LogConfig}
+import kafka.log.{CleanerConfig, LogConfig, UnifiedLog}
import kafka.message.ProducerCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.network.{Processor, RequestChannel}
@@ -64,6 +64,8 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.collection._
@@ -80,7 +82,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
import DynamicBrokerReconfigurationTest._
- private val servers = new ArrayBuffer[KafkaServer]
+ private val servers = new ArrayBuffer[KafkaBroker]
private val numServers = 3
private val numPartitions = 10
private val producers = new ArrayBuffer[KafkaProducer[String, String]]
@@ -111,15 +113,22 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
(0 until numServers).foreach { brokerId =>
- val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+ val props = if (isKRaftTest()) {
+ val properties = TestUtils.createBrokerConfig(brokerId, null)
+ properties.put(KafkaConfig.AdvertisedListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
+ properties
+ } else {
+ val properties = TestUtils.createBrokerConfig(brokerId, zkConnect)
+ properties.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+ properties
+ }
props ++= securityProps(sslProperties1, TRUSTSTORE_PROPS)
// Ensure that we can support multiple listeners per security protocol and multiple security protocols
props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
- props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
+ props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL, CONTROLLER:$controllerListenerSecurityProtocol")
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
props.put(KafkaConfig.SslClientAuthProp, "requested")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
- props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update
props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads
@@ -138,17 +147,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props ++= securityProps(sslProperties1, KEYSTORE_PROPS, listenerPrefix(SecureExternal))
val kafkaConfig = KafkaConfig.fromProps(props)
- configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+ if (!isKRaftTest()) {
+ configureDynamicKeystoreInZooKeeper(kafkaConfig, sslProperties1)
+ }
- servers += TestUtils.createServer(kafkaConfig)
+ servers += createBroker(kafkaConfig)
}
- TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor = numServers, servers)
- TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, servers.head.config.offsetsTopicPartitions,
- replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
-
createAdminClient(SecurityProtocol.SSL, SecureInternal)
+ TestUtils.createTopicWithAdmin(adminClients.head, topic, servers, numPartitions, replicationFactor = numServers)
+ TestUtils.createTopicWithAdmin(adminClients.head, Topic.GROUP_METADATA_TOPIC_NAME, servers,
+ numPartitions = servers.head.config.offsetsTopicPartitions,
+ replicationFactor = numServers,
+ topicConfig = servers.head.groupCoordinator.offsetsTopicConfigs)
+
TestMetricsReporter.testReporters.clear()
}
@@ -166,8 +179,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
closeSasl()
}
- @Test
- def testConfigDescribeUsingAdminClient(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConfigDescribeUsingAdminClient(quorum: String): Unit = {
def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, isReadOnly: Boolean,
expectedProps: Properties): Unit = {
@@ -226,9 +240,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
val adminClient = adminClients.head
alterSslKeystoreUsingConfigCommand(sslProperties1, SecureExternal)
- val configDesc = describeConfig(adminClient)
- verifySslConfig("listener.name.external.", sslProperties1, configDesc)
- verifySslConfig("", invalidSslProperties, configDesc)
+ val configDesc = TestUtils.tryUntilNoAssertionError() {
+ val describeConfigsResult = describeConfig(adminClient)
+ verifySslConfig("listener.name.external.", sslProperties1, describeConfigsResult)
+ verifySslConfig("", invalidSslProperties, describeConfigsResult)
+ describeConfigsResult
+ }
// Verify a few log configs with and without synonyms
val expectedProps = new Properties
@@ -262,8 +279,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertEquals(List((KafkaConfig.LogCleanerThreadsProp, ConfigSource.DEFAULT_CONFIG)), synonymsList(logCleanerThreads))
}
- @Test
- def testUpdatesUsingConfigProvider(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testUpdatesUsingConfigProvider(quorum: String): Unit = {
val PollingIntervalVal = f"$${file:polling.interval:interval}"
val PollingIntervalUpdateVal = f"$${file:polling.interval:updinterval}"
val SslTruststoreTypeVal = f"$${file:ssl.truststore.type:storetype}"
@@ -309,11 +327,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
}
- // fetch from ZK, values should be unresolved
- val props = fetchBrokerConfigsFromZooKeeper(servers.head)
- assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
- assertTrue(props.getProperty(configPrefix+KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
- assertTrue(props.getProperty(configPrefix+KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+ if (!isKRaftTest()) {
+ // fetch from ZK, values should be unresolved
+ val props = fetchBrokerConfigsFromZooKeeper(servers.head)
+ assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == PollingIntervalVal, "polling interval is not updated in ZK")
+ assertTrue(props.getProperty(configPrefix + KafkaConfig.SslTruststoreTypeProp) == SslTruststoreTypeVal, "store type is not updated in ZK")
+ assertTrue(props.getProperty(configPrefix + KafkaConfig.SslKeystorePasswordProp) == SslKeystorePasswordVal, "keystore password is not updated in ZK")
+ }
// verify the update
// 1. verify update not occurring if the value of property is same.
@@ -332,10 +352,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
- @Test
+ @Test // TODO KAFKA-14126 add KRaft support
def testKeyStoreAlter(): Unit = {
val topic2 = "testtopic2"
- TestUtils.createTopic(zkClient, topic2, numPartitions, replicationFactor = numServers, servers)
+ TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions, replicationFactor = numServers)
// Start a producer and consumer that work with the current broker keystore.
// This should continue working while changes are made
@@ -399,7 +419,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
- @Test
+ @Test // TODO KAFKA-14126 add KRaft support
def testTrustStoreAlter(): Unit = {
val producerBuilder = ProducerBuilder().listenerName(SecureInternal).securityProtocol(SecurityProtocol.SSL)
@@ -481,7 +501,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifySslProduceConsume(sslProperties2, "alter-truststore-7")
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
- val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+ val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
val controllerChannelManager = controller.kafkaController.controllerChannelManager
val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
JTestUtils.fieldValue(controllerChannelManager, classOf[ControllerChannelManager], "brokerStateInfo")
@@ -492,8 +512,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyBrokerToControllerCall(controller)
}
- @Test
- def testLogCleanerConfig(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLogCleanerConfig(quorum: String): Unit = {
val (producerThread, consumerThread) = startProduceConsume(retries = 0)
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
@@ -537,13 +558,23 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
- @Test
- def testConsecutiveConfigChange(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConsecutiveConfigChange(quorum: String): Unit = {
val topic2 = "testtopic2"
val topicProps = new Properties
topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
- TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps)
- var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+ TestUtils.createTopicWithAdmin(adminClients.head, topic2, servers, numPartitions = 1, replicationFactor = numServers, topicConfig = topicProps)
+
+ def getLogOrThrow(tp: TopicPartition): UnifiedLog = {
+ var (logOpt, found) = TestUtils.computeUntilTrue {
+ servers.head.logManager.getLog(tp)
+ }(_.isDefined)
+ assertTrue(found, "Log not found")
+ logOpt.get
+ }
+
+ var log = getLogOrThrow(new TopicPartition(topic2, 0))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
@@ -558,7 +589,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
}
- log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+ log = getLogOrThrow(new TopicPartition(topic2, 0))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives
@@ -566,7 +597,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
props.clear()
props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000"))
- log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
+ log = getLogOrThrow(new TopicPartition(topic2, 0))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives
}
@@ -974,6 +1005,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@Test
+ // Modifying advertised listeners is not supported in KRaft
def testAdvertisedListenerUpdate(): Unit = {
val adminClient = adminClients.head
val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
@@ -994,11 +1026,13 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
// Verify that endpoints have been updated in ZK for all brokers
- servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost)))
+ servers.foreach { server =>
+ validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => endpoints.contains(invalidHost))
+ }
// Trigger session expiry and ensure that controller registers new advertised listener after expiry
val controllerEpoch = zkClient.getControllerEpoch
- val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller")))
+ val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller"))).asInstanceOf[KafkaServer]
val controllerZkClient = controllerServer.zkClient
val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper)
sessionExpiringClient.close()
@@ -1022,7 +1056,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
.getCause.isInstanceOf[org.apache.kafka.common.errors.TimeoutException])
alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
- servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
+ servers.foreach { server =>
+ validateEndpointsInZooKeeper(server.asInstanceOf[KafkaServer], endpoints => !endpoints.contains(invalidHost))
+ }
// Verify that produce/consume work now
val topic2 = "testtopic2"
@@ -1119,7 +1155,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
assertTrue(partitions.exists(_.leader == null), "Did not find partitions with no leader")
}
- private def addListener(servers: Seq[KafkaServer], listenerName: String, securityProtocol: SecurityProtocol,
+ private def addListener(servers: Seq[KafkaBroker], listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config
val existingListenerCount = config.listeners.size
@@ -1264,11 +1300,11 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
}
- private def hasListenerMetric(server: KafkaServer, listenerName: String): Boolean = {
+ private def hasListenerMetric(server: KafkaBroker, listenerName: String): Boolean = {
server.socketServer.metrics.metrics.keySet.asScala.exists(_.tags.get("listener") == listenerName)
}
- private def fetchBrokerConfigsFromZooKeeper(server: KafkaServer): Properties = {
+ private def fetchBrokerConfigsFromZooKeeper(server: KafkaBroker): Properties = {
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
server.config.dynamicConfig.fromPersistentProps(props, perBrokerConfig = true)
}
@@ -1322,7 +1358,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}, "Did not fail authentication with invalid config")
}
- private def describeConfig(adminClient: Admin, servers: Seq[KafkaServer] = this.servers): Config = {
+ private def describeConfig(adminClient: Admin, servers: Seq[KafkaBroker] = this.servers): Config = {
val configResources = servers.map { server =>
new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
}
@@ -1419,7 +1455,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@nowarn("cat=deprecation")
- private def alterConfigsOnServer(server: KafkaServer, props: Properties): Unit = {
+ private def alterConfigsOnServer(server: KafkaBroker, props: Properties): Unit = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
val configs = Map(new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString) -> newConfig).asJava
@@ -1428,7 +1464,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}
@nowarn("cat=deprecation")
- private def alterConfigs(servers: Seq[KafkaServer], adminClient: Admin, props: Properties,
+ private def alterConfigs(servers: Seq[KafkaBroker], adminClient: Admin, props: Properties,
perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
@@ -1507,7 +1543,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
private def createPasswordEncoder(config: KafkaConfig, secret: Option[Password]): PasswordEncoder = {
val encoderSecret = secret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
- new PasswordEncoder(encoderSecret,
+ PasswordEncoder.encrypting(encoderSecret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
@@ -1518,7 +1554,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
}
- private def waitForConfigOnServer(server: KafkaServer, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
+ private def waitForConfigOnServer(server: KafkaBroker, propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
TestUtils.retry(maxWaitMs) {
assertEquals(propValue, server.config.originals.get(propName))
}
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
index 0a5d5ac029..50cdceabbc 100755
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -30,7 +30,7 @@ class PasswordEncoderTest {
@Test
def testEncodeDecode(): Unit = {
- val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
None,
Defaults.PasswordEncoderCipherAlgorithm,
Defaults.PasswordEncoderKeyLength,
@@ -54,7 +54,7 @@ class PasswordEncoderTest {
@Test
def testEncoderConfigChange(): Unit = {
- val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
Some("PBKDF2WithHmacSHA1"),
"DES/CBC/PKCS5Padding",
64,
@@ -68,7 +68,7 @@ class PasswordEncoderTest {
assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
// Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
- val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val decoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
Some("PBKDF2WithHmacSHA1"),
"AES/CBC/PKCS5Padding",
128,
@@ -76,7 +76,7 @@ class PasswordEncoderTest {
assertEquals(password, decoder.decode(encoded).value)
// Test that decoding fails if secret is altered
- val decoder2 = new PasswordEncoder(new Password("secret-2"),
+ val decoder2 = PasswordEncoder.encrypting(new Password("secret-2"),
Some("PBKDF2WithHmacSHA1"),
"AES/CBC/PKCS5Padding",
128,
@@ -92,7 +92,7 @@ class PasswordEncoderTest {
def testEncodeDecodeAlgorithms(): Unit = {
def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
- val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
+ val encoder = PasswordEncoder.encrypting(new Password("password-encoder-secret"),
keyFactoryAlg,
cipherAlg,
keyLength,
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8d921e54f1..e388896c03 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -482,7 +482,7 @@ object TestUtils extends Logging {
topic: String,
numPartitions: Int = 1,
replicationFactor: Int = 1,
- servers: Seq[KafkaServer],
+ servers: Seq[KafkaBroker],
topicConfig: Properties = new Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
@@ -514,7 +514,7 @@ object TestUtils extends Logging {
def createTopic(zkClient: KafkaZkClient,
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
- servers: Seq[KafkaServer]): scala.collection.immutable.Map[Int, Int] = {
+ servers: Seq[KafkaBroker]): scala.collection.immutable.Map[Int, Int] = {
createTopic(zkClient, topic, partitionReplicaAssignment, servers, new Properties())
}
@@ -526,7 +526,7 @@ object TestUtils extends Logging {
def createTopic(zkClient: KafkaZkClient,
topic: String,
partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
- servers: Seq[KafkaServer],
+ servers: Seq[KafkaBroker],
topicConfig: Properties): scala.collection.immutable.Map[Int, Int] = {
val adminZkClient = new AdminZkClient(zkClient)
// create topic
@@ -554,7 +554,7 @@ object TestUtils extends Logging {
* Create the consumer offsets/group metadata topic and wait until the leader is elected and metadata is propagated
* to all brokers.
*/
- def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaServer]): Unit = {
+ def createOffsetsTopic(zkClient: KafkaZkClient, servers: Seq[KafkaBroker]): Unit = {
val server = servers.head
createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
server.config.getInt(KafkaConfig.OffsetsTopicPartitionsProp),
@@ -1014,18 +1014,19 @@ object TestUtils extends Logging {
* otherwise difficult to poll for. `computeUntilTrue` and `waitUntilTrue` should be preferred in cases where we can
* easily wait on a condition before evaluating the assertions.
*/
- def tryUntilNoAssertionError(waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => Unit) = {
- val (error, success) = TestUtils.computeUntilTrue({
+ def tryUntilNoAssertionError[T](waitTime: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L)(assertions: => T): T = {
+ val (either, success) = TestUtils.computeUntilTrue({
try {
- assertions
- None
+ val res = assertions
+ Left(res)
} catch {
- case ae: AssertionError => Some(ae)
+ case ae: AssertionError => Right(ae)
}
- }, waitTime = waitTime, pause = pause)(_.isEmpty)
+ }, waitTime = waitTime, pause = pause)(_.isLeft)
- if (!success) {
- throw error.get
+ either match {
+ case Left(res) => res
+ case Right(err) => throw err
}
}