You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/09/06 16:22:25 UTC
[kafka] branch 3.0 updated: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB
by default (#11295)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 2e819a7 KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (#11295)
2e819a7 is described below
commit 2e819a7208fa4e0933c7942d07c54130bd12c119
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Sep 6 09:18:47 2021 -0700
KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (#11295)
We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
auto configures itself if certain system properties have been set).
I added a unit test that fails without the change and passes with it.
I also refactored the code to streamline the way we handle parameters passed to
KafkaZkClient and ZooKeeperClient.
See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
changed in 3.6.0.
Credit to @rondagostino for finding and reporting this issue.
Reviewers: David Jacot <dj...@confluent.io>
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../scala/kafka/admin/ZkSecurityMigrator.scala | 2 +-
.../kafka/security/authorizer/AclAuthorizer.scala | 22 +++++-----
core/src/main/scala/kafka/server/KafkaConfig.scala | 49 ++++++++++------------
core/src/main/scala/kafka/server/KafkaServer.scala | 15 ++++---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 27 +++++++++---
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 25 ++---------
.../scala/integration/kafka/api/SaslSetup.scala | 2 +-
.../kafka/security/auth/ZkAuthorizationTest.scala | 16 ++++---
.../security/authorizer/AclAuthorizerTest.scala | 36 ++++++++--------
.../AuthorizerInterfaceDefaultTest.scala | 5 ++-
.../scala/unit/kafka/server/KafkaServerTest.scala | 16 +++----
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 46 ++++++++++++++++----
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 5 ++-
.../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 45 ++++++++++----------
15 files changed, 169 insertions(+), 144 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 39a3698..5e5ccef 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -115,7 +115,7 @@ object ConfigCommand extends Config {
val zkClientConfig = ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile)
.getOrElse(new ZKClientConfig())
val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
- Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+ Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ConfigCommand")
val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.alterOpt))
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a8b799a..1263195 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -105,7 +105,7 @@ object ZkSecurityMigrator extends Logging {
val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
- Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+ Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator")
val enablePathCheck = opts.options.has(opts.enablePathCheckOpt)
val migrator = new ZkSecurityMigrator(zkClient)
migrator.run(enablePathCheck)
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 5f701d8..88648fd 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -95,25 +95,25 @@ object AclAuthorizer {
}
}
- private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
+ private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
- None
+ new ZKClientConfig
else {
// start with the base config from the Kafka configuration
// be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
// add in any prefixed overlays
- KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => {
- val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp)
- if (prefixedValue.isDefined)
- zkClientConfig.get.setProperty(sysProp,
+ KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) =>
+ configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
+ zkClientConfig.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
- (prefixedValue.get.toString.toUpperCase == "HTTPS").toString
+ (prefixedValue.toString.toUpperCase == "HTTPS").toString
else
- prefixedValue.get.toString)
- }}
+ prefixedValue.toString)
+ }
+ }
zkClientConfig
}
}
@@ -178,8 +178,8 @@ class AclAuthorizer extends Authorizer with Logging {
// createChrootIfNecessary=true is necessary in case we are running in a KRaft cluster
// because such a cluster will not create any chroot path in ZooKeeper (it doesn't connect to ZooKeeper)
zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
- zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"),
- zkClientConfig = zkClientConfig, createChrootIfNecessary = true)
+ zkMaxInFlightRequests, time, name = "ACL authorizer", zkClientConfig = zkClientConfig,
+ metricGroup = "kafka.security", metricType = "AclAuthorizer", createChrootIfNecessary = true)
zkClient.createAclPaths()
extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a30d31d..c556d7a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -336,7 +336,7 @@ object KafkaConfig {
ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")
- private[kafka] def getZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
+ private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
}
@@ -345,7 +345,7 @@ object KafkaConfig {
kafkaPropName match {
case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
- case list: java.util.List[_] => list.asInstanceOf[java.util.List[_]].asScala.mkString(",")
+ case list: java.util.List[_] => list.asScala.mkString(",")
case _ => kafkaPropValue.toString
}
case _ => kafkaPropValue.toString
@@ -354,10 +354,10 @@ object KafkaConfig {
// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
- private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
- getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
- getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
- getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
+ private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
+ zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") &&
+ zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
+ zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
}
/** ********* General Configuration ***********/
@@ -1440,7 +1440,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Need to translate any system property value from true/false (String) to true/false (Boolean)
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getBoolean(propKey) else {
- val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
+ val sysPropValue = KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some("true") => true
case Some(_) => false
@@ -1453,35 +1453,27 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getString(propKey) else {
- val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
- sysPropValue match {
- case Some(_) => sysPropValue.get
+ KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) match {
+ case Some(v) => v
case _ => getString(propKey) // not specified so use the default value
}
}
}
private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = {
- Option(getString(propKey)) match {
- case config: Some[String] => config
- case _ => KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
+ Option(getString(propKey)).orElse {
+ KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
}
}
private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = {
- Option(getPassword(propKey)) match {
- case config: Some[Password] => config
- case _ => {
- val sysProp = KafkaConfig.getZooKeeperClientProperty (zkClientConfigViaSystemProperties, propKey)
- if (sysProp.isDefined) Some (new Password (sysProp.get) ) else None
- }
+ Option(getPassword(propKey)).orElse {
+ KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map(new Password(_))
}
}
private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = {
- Option(getList(propKey)) match {
- case config: Some[util.List[String]] => config
- case _ => {
- val sysProp = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
- if (sysProp.isDefined) Some(sysProp.get.split("\\s*,\\s*").toList.asJava) else None
+ Option(getList(propKey)).orElse {
+ KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map { sysProp =>
+ sysProp.split("\\s*,\\s*").toBuffer.asJava
}
}
}
@@ -1502,12 +1494,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// Need to translate any system property value from true/false to HTTPS/<blank>
val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
val actuallyProvided = originals.containsKey(kafkaProp)
- if (actuallyProvided) getString(kafkaProp) else {
- val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp)
- sysPropValue match {
+ if (actuallyProvided)
+ getString(kafkaProp)
+ else {
+ KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) match {
case Some("true") => "HTTPS"
case Some(_) => ""
- case _ => getString(kafkaProp) // not specified so use the default value
+ case None => getString(kafkaProp) // not specified so use the default value
}
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b3c66a6..c22eab2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -56,11 +56,9 @@ import scala.jdk.CollectionConverters._
object KafkaServer {
- def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) =
- if (!config.zkSslClientEnable && !forceZkSslClientEnable)
- None
- else {
- val clientConfig = new ZKClientConfig()
+ def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
+ val clientConfig = new ZKClientConfig
+ if (config.zkSslClientEnable || forceZkSslClientEnable) {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
@@ -75,8 +73,9 @@ object KafkaServer {
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
- Some(clientConfig)
}
+ clientConfig
+ }
val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000
}
@@ -144,7 +143,7 @@ class KafkaServer(
var metadataCache: ZkMetadataCache = null
var quotaManagers: QuotaFactory.QuotaManagers = null
- val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
+ val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
private var _zkClient: KafkaZkClient = null
private var configRepository: ZkConfigRepository = null
@@ -454,7 +453,7 @@ class KafkaServer(
s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")
_zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
- config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig),
+ config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig,
createChrootIfNecessary = true)
_zkClient.createTopLevelPaths()
}
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index bcc89de9..823d6e8 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -17,7 +17,6 @@
package kafka.zk
import java.util.Properties
-
import com.yammer.metrics.core.MetricName
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
@@ -38,6 +37,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
import org.apache.zookeeper.client.ZKClientConfig
+import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
@@ -1940,18 +1940,33 @@ object KafkaZkClient {
connectionTimeoutMs: Int,
maxInFlightRequests: Int,
time: Time,
+ name: String,
+ zkClientConfig: ZKClientConfig,
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener",
- name: Option[String] = None,
- zkClientConfig: Option[ZKClientConfig] = None,
createChrootIfNecessary: Boolean = false
): KafkaZkClient = {
+
+ /* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB.
+ * This causes a regression if Kafka tries to retrieve a large amount of data across many
+ * znodes – in such a case the ZooKeeper client will repeatedly emit a message of the form
+ * "java.io.IOException: Packet len <####> is out of range".
+ *
+ * We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
+ * auto configures itself if certain system properties have been set).
+ *
+ * See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
+ * changed in 3.6.0.
+ */
+ if (zkClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER) == null)
+ zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, ((4096 * 1024).toString))
+
if (createChrootIfNecessary) {
val chrootIndex = connectString.indexOf("/")
if (chrootIndex > 0) {
val zkConnWithoutChrootForChrootCreation = connectString.substring(0, chrootIndex)
- val zkClientForChrootCreation = KafkaZkClient(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
- connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig)
+ val zkClientForChrootCreation = apply(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
+ connectionTimeoutMs, maxInFlightRequests, time, name, zkClientConfig, metricGroup, metricType)
try {
val chroot = connectString.substring(chrootIndex)
if (!zkClientForChrootCreation.pathExists(chroot)) {
@@ -1963,7 +1978,7 @@ object KafkaZkClient {
}
}
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
- time, metricGroup, metricType, name, zkClientConfig)
+ time, metricGroup, metricType, zkClientConfig, name)
new KafkaZkClient(zooKeeperClient, isSecure, time)
}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 96ef6d5..091b401 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -61,24 +61,10 @@ class ZooKeeperClient(connectString: String,
time: Time,
metricGroup: String,
metricType: String,
- name: Option[String],
- zkClientConfig: Option[ZKClientConfig]) extends Logging with KafkaMetricsGroup {
-
- def this(connectString: String,
- sessionTimeoutMs: Int,
- connectionTimeoutMs: Int,
- maxInFlightRequests: Int,
- time: Time,
- metricGroup: String,
- metricType: String) = {
- this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None,
- None)
- }
+ private[zookeeper] val clientConfig: ZKClientConfig,
+ name: String) extends Logging with KafkaMetricsGroup {
- this.logIdent = name match {
- case Some(n) => s"[ZooKeeperClient $n] "
- case _ => "[ZooKeeperClient] "
- }
+ this.logIdent = s"[ZooKeeperClient $name] "
private val initializationLock = new ReentrantReadWriteLock()
private val isConnectedOrExpiredLock = new ReentrantLock()
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
@@ -109,13 +95,10 @@ class ZooKeeperClient(connectString: String,
}
}
- private val clientConfig = zkClientConfig getOrElse new ZKClientConfig()
-
info(s"Initializing a new session to $connectString.")
// Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
clientConfig)
- private[zookeeper] def getClientConfig = clientConfig
newGauge("SessionState", () => connectionState.toString)
@@ -436,7 +419,7 @@ class ZooKeeperClient(connectString: String,
}, delayMs, period = -1L, unit = TimeUnit.MILLISECONDS)
}
- private def threadPrefix: String = name.map(n => n.replaceAll("\\s", "") + "-").getOrElse("")
+ private def threadPrefix: String = name.replaceAll("\\s", "") + "-"
// package level visibility for testing only
private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 9237011..d613b72 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -195,7 +195,7 @@ trait SaslSetup {
val zkClientConfig = new ZKClientConfig()
val zkClient = KafkaZkClient(
zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
- Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+ Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig)
val adminZkClient = new AdminZkClient(zkClient)
val entityType = ConfigType.User
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 69105c3..74803ce 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -18,7 +18,6 @@
package kafka.security.auth
import java.nio.charset.StandardCharsets
-
import kafka.admin.ZkSecurityMigrator
import kafka.utils.{Logging, TestUtils}
import kafka.zk._
@@ -36,6 +35,7 @@ import kafka.controller.ReplicaAssignment
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.client.ZKClientConfig
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -137,13 +137,17 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
(securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+ private def newKafkaZkClient(connectionString: String, isSecure: Boolean) =
+ KafkaZkClient(connectionString, isSecure, 6000, 6000, Int.MaxValue, Time.SYSTEM, "ZkAuthorizationTest",
+ new ZKClientConfig)
+
/**
* Tests the migration tool when making an unsecure
* cluster secure.
*/
@Test
def testZkMigration(): Unit = {
- val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+ val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
testMigration(zkConnect, unsecureZkClient, zkClient)
} finally {
@@ -157,7 +161,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
*/
@Test
def testZkAntiMigration(): Unit = {
- val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+ val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
try {
testMigration(zkConnect, zkClient, unsecureZkClient)
} finally {
@@ -198,8 +202,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
def testChroot(): Unit = {
val zkUrl = zkConnect + "/kafka"
zkClient.createRecursive("/kafka")
- val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
- val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+ val unsecureZkClient = newKafkaZkClient(zkUrl, isSecure = false)
+ val secureZkClient = newKafkaZkClient(zkUrl, isSecure = true)
try {
testMigration(zkUrl, unsecureZkClient, secureZkClient)
} finally {
@@ -284,7 +288,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
*/
private def deleteAllUnsecure(): Unit = {
System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
- val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+ val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
val result: Try[Boolean] = {
deleteRecursive(unsecureZkClient, "/")
}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index fa201db..a49b0d3 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.{Collections, UUID}
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
-
import kafka.Kafka
import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
@@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.authorizer._
import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
+import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@@ -86,7 +86,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "AclAuthorizerTest")
+ Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest")
}
@AfterEach
@@ -779,9 +779,12 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
@Test
def testAuthorizerNoZkConfig(): Unit = {
val noTlsProps = Kafka.getPropsFromArgs(Array(prepareDefaultConfig))
- assertEquals(None, AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
+ val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(noTlsProps),
- mutable.Map(noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala.toSeq: _*)))
+ noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
+ KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+ assertNull(zkClientConfig.getProperty(propName))
+ }
}
@Test
@@ -799,20 +802,19 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue)
- configs.foreach{case (key, value) => props.put(key, value.toString) }
+ configs.foreach { case (key, value) => props.put(key, value) }
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
- assertTrue(zkClientConfig.isDefined)
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
- assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
- assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslProtocolProp =>
- assertEquals("TLSv1.2", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
- case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("TLSv1.2", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
+ case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
})
}
@@ -839,14 +841,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
- assertTrue(zkClientConfig.isDefined)
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
- assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
- assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
- case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
+ case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
})
}
@@ -889,14 +890,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
- assertTrue(zkClientConfig.isDefined)
// confirm we get all the values we expect
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
- assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
- assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
- case _ => assertEquals(prefixedValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+ assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
+ case _ => assertEquals(prefixedValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
})
}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
index 86c7a14..bccd58a 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
@@ -18,7 +18,6 @@ package kafka.security.authorizer
import java.util.concurrent.CompletionStage
import java.{lang, util}
-
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
@@ -27,6 +26,7 @@ import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.authorizer._
+import org.apache.zookeeper.client.ZKClientConfig
import org.junit.jupiter.api.{AfterEach, BeforeEach}
class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
@@ -49,7 +49,8 @@ class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAutho
interfaceDefaultAuthorizer.authorizer.configure(config.originals)
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest")
+ Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest", new ZKClientConfig,
+ "AuthorizerInterfaceDefaultTest")
}
@AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 35f3273..8056e23 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -20,8 +20,7 @@ package kafka.server
import kafka.api.ApiVersion
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
-import org.apache.zookeeper.client.ZKClientConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail}
import org.junit.jupiter.api.Test
import java.util.Properties
@@ -47,7 +46,10 @@ class KafkaServerTest extends ZooKeeperTestHarness {
val props = new Properties
props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
props.put(KafkaConfig.ZkSslClientEnableProp, "false")
- assertEquals(None, KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)))
+ val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
+ KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+ assertNull(zkClientConfig.getProperty(propName))
+ }
}
@Test
@@ -62,7 +64,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
- val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
+ val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
@@ -70,7 +72,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
- assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+ assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
}
@Test
@@ -87,7 +89,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
- val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
+ val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
// now check to make sure the values were set correctly
def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
case KafkaConfig.ZkSslClientEnableProp => "true"
@@ -97,7 +99,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
case _ => someValue
}
KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
- assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+ assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 0c1d860..2088e5f 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -51,6 +51,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.client.ZKClientConfig
+import org.apache.zookeeper.common.ZKConfig
import org.apache.zookeeper.data.Stat
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -76,7 +77,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
super.setUp()
zkClient.createControllerEpochRaw(1)
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
+ zkClientConfig = new ZKClientConfig)
expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@@ -103,15 +105,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig))
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient", zkClientConfig = clientConfig)
try {
- assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey))
+ assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey))
// For a sanity check, make sure a bad client connection socket class name generates an exception
val badClientConfig = new ZKClientConfig()
KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName")
assertThrows(classOf[Exception],
() => KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(badClientConfig)))
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest", zkClientConfig = badClientConfig))
} finally {
client.close()
}
@@ -121,9 +123,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@ValueSource(booleans = Array(true, false))
def testChroot(createChrootIfNecessary: Boolean): Unit = {
val chroot = "/chroot"
- val clientConfig = new ZKClientConfig()
val client = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig), createChrootIfNecessary = createChrootIfNecessary)
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest",
+ zkClientConfig = new ZKClientConfig, createChrootIfNecessary = createChrootIfNecessary)
try {
client.createTopLevelPaths()
if (!createChrootIfNecessary) {
@@ -158,7 +160,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// this client doesn't have create permission to the root and chroot, but the chroot already exists
// Expect that no exception thrown
val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary = true)
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest",
+ zkClientConfig = new ZKClientConfig, createChrootIfNecessary = true)
chrootClient.close()
}
@@ -1340,6 +1343,33 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath))
}
+ @Test
+ def testJuteMaxBufffer(): Unit = {
+
+ def assertJuteMaxBufferConfig(clientConfig: ZKClientConfig, expectedValue: String): Unit = {
+ val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
+ zkClientConfig = clientConfig)
+ try assertEquals(expectedValue, client.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+ finally client.close()
+ }
+
+ // default case
+ assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+
+ // Value set directly on ZKClientConfig takes precedence over system property
+ System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString)
+ try {
+ val clientConfig1 = new ZKClientConfig
+ clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 1024).toString)
+ assertJuteMaxBufferConfig(clientConfig1, expectedValue = "2048000")
+
+ // System property value is used if value is not set in ZKClientConfig
+ assertJuteMaxBufferConfig(new ZKClientConfig, expectedValue = "3072000")
+
+ } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
+ }
+
class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
extends KafkaZkClient(zooKeeperClient, isSecure, time) {
// Overwriting this method from the parent class to force the client to re-register the Broker.
@@ -1365,7 +1395,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
metricGroup: String = "kafka.server",
metricType: String = "SessionExpireListener") = {
val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
- time, metricGroup, metricType)
+ time, metricGroup, metricType, new ZKClientConfig, "ExpiredKafkaZkClient")
new ExpiredKafkaZkClient(zooKeeperClient, isSecure, time)
}
}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 6bc9d3a..8b61c0e 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestUtils}
-import org.junit.jupiter.api.{AfterEach, AfterAll, BeforeEach, BeforeAll, Tag}
+import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.security.JaasUtils
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
@Tag("integration")
@@ -53,7 +54,7 @@ abstract class ZooKeeperTestHarness extends Logging {
def setUp(): Unit = {
zookeeper = new EmbeddedZookeeper()
zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "ZooKeeperTestHarness", new ZKClientConfig)
adminZkClient = new AdminZkClient(zkClient)
}
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 0392386..a0eb1ea 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -49,8 +49,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach")
cleanMetricsRegistry()
super.setUp()
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "testMetricGroup", "testMetricType")
+ zooKeeperClient = newZooKeeperClient()
}
@AfterEach
@@ -65,8 +64,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testUnresolvableConnectString(): Unit = {
try {
- new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs = 10,
- Int.MaxValue, time, "testMetricGroup", "testMetricType")
+ newZooKeeperClient("some.invalid.hostname.foo.bar.local", connectionTimeoutMs = 10)
} catch {
case e: ZooKeeperClientTimeoutException =>
assertEquals(Set.empty, runningZkSendThreads, "ZooKeeper client threads still running")
@@ -81,14 +79,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()
- assertThrows(classOf[ZooKeeperClientTimeoutException], () => new ZooKeeperClient(zkConnect, zkSessionTimeout,
- connectionTimeoutMs = 10, Int.MaxValue, time, "testMetricGroup", "testMetricType").close())
+ assertThrows(classOf[ZooKeeperClientTimeoutException], () => newZooKeeperClient(
+ connectionTimeoutMs = 10).close())
}
@Test
def testConnection(): Unit = {
- val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
- "testMetricType")
+ val client = newZooKeeperClient()
try {
// Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients
val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
@@ -108,15 +105,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
val propKey = KafkaConfig.ZkClientCnxnSocketProp
val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
- val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
- "testMetricType", None, Some(clientConfig))
+ val client = newZooKeeperClient(clientConfig = clientConfig)
try {
- assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.getClientConfig, propKey))
+ assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.clientConfig, propKey))
// For a sanity check, make sure a bad client connection socket class name generates an exception
val badClientConfig = new ZKClientConfig()
KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName")
- assertThrows(classOf[Exception], () => new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
- Int.MaxValue, time, "testMetricGroup", "testMetricType", None, Some(badClientConfig)))
+ assertThrows(classOf[Exception], () => newZooKeeperClient(clientConfig = badClientConfig))
} finally {
client.close()
}
@@ -350,8 +345,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
zooKeeperClient.close()
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
- "testMetricGroup", "testMetricType")
+ zooKeeperClient = newZooKeeperClient()
zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
val requestThread = new Thread() {
@@ -399,8 +393,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
zooKeeperClient.close()
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
- "testMetricGroup", "testMetricType")
+ zooKeeperClient = newZooKeeperClient()
zooKeeperClient.registerStateChangeHandler(faultyHandler)
zooKeeperClient.registerStateChangeHandler(goodHandler)
@@ -476,8 +469,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
}
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
- "testMetricGroup", "testMetricType")
+ val zooKeeperClient = newZooKeeperClient()
try {
zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
zooKeeperClient.forceReinitialize()
@@ -489,8 +481,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testConnectionLossRequestTermination(): Unit = {
val batchSize = 10
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time,
- "testGroupType", "testGroupName")
+ val zooKeeperClient = newZooKeeperClient(maxInFlight = 2)
zookeeper.shutdown()
try {
val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
@@ -553,7 +544,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@volatile var resultCodes: Seq[Code] = null
val stateChanges = new ConcurrentLinkedQueue[String]()
val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, maxInflightRequests,
- time, "testGroupType", "testGroupName") {
+ time, "testGroupType", "testGroupName", new ZKClientConfig, "ZooKeeperClientTest") {
override def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
super.send(request)( response => {
responseExecutor.submit(new Runnable {
@@ -657,8 +648,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
}
zooKeeperClient.close()
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
- "testMetricGroup", "testMetricType")
+ zooKeeperClient = newZooKeeperClient()
zooKeeperClient.registerStateChangeHandler(changeHandler)
zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
@@ -708,6 +698,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
assertEquals(States.CLOSED, zooKeeperClient.connectionState)
}
+ private def newZooKeeperClient(connectionString: String = zkConnect,
+ connectionTimeoutMs: Int = zkConnectionTimeout,
+ maxInFlight: Int = zkMaxInFlightRequests,
+ clientConfig: ZKClientConfig = new ZKClientConfig) =
+ new ZooKeeperClient(connectionString, zkSessionTimeout, connectionTimeoutMs, maxInFlight, time,
+ "testMetricGroup", "testMetricType", clientConfig, "ZooKeeperClientTest")
+
private def cleanMetricsRegistry(): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry
metrics.allMetrics.keySet.forEach(metrics.removeMetric)