You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/09/06 16:22:25 UTC

[kafka] branch 3.0 updated: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (#11295)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 2e819a7  KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (#11295)
2e819a7 is described below

commit 2e819a7208fa4e0933c7942d07c54130bd12c119
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon Sep 6 09:18:47 2021 -0700

    KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default (#11295)
    
    We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
    auto configures itself if certain system properties have been set).
    
    I added a unit test that fails without the change and passes with it.
    
    I also refactored the code to streamline the way we handle parameters passed to
    KafkaZkClient and ZooKeeperClient.
    
    See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
    changed in 3.6.0.
    
    Credit to @rondagostino for finding and reporting this issue.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  2 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala     |  2 +-
 .../kafka/security/authorizer/AclAuthorizer.scala  | 22 +++++-----
 core/src/main/scala/kafka/server/KafkaConfig.scala | 49 ++++++++++------------
 core/src/main/scala/kafka/server/KafkaServer.scala | 15 ++++---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 27 +++++++++---
 .../scala/kafka/zookeeper/ZooKeeperClient.scala    | 25 ++---------
 .../scala/integration/kafka/api/SaslSetup.scala    |  2 +-
 .../kafka/security/auth/ZkAuthorizationTest.scala  | 16 ++++---
 .../security/authorizer/AclAuthorizerTest.scala    | 36 ++++++++--------
 .../AuthorizerInterfaceDefaultTest.scala           |  5 ++-
 .../scala/unit/kafka/server/KafkaServerTest.scala  | 16 +++----
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 46 ++++++++++++++++----
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  5 ++-
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 45 ++++++++++----------
 15 files changed, 169 insertions(+), 144 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 39a3698..5e5ccef 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -115,7 +115,7 @@ object ConfigCommand extends Config {
     val zkClientConfig = ZkSecurityMigrator.createZkClientConfigFromOption(opts.options, opts.zkTlsConfigFile)
       .getOrElse(new ZKClientConfig())
     val zkClient = KafkaZkClient(zkConnectString, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
-      Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+      Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ConfigCommand")
     val adminZkClient = new AdminZkClient(zkClient)
     try {
       if (opts.options.has(opts.alterOpt))
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index a8b799a..1263195 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -105,7 +105,7 @@ object ZkSecurityMigrator extends Logging {
     val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue
     val zkConnectionTimeout = opts.options.valueOf(opts.zkConnectionTimeoutOpt).intValue
     val zkClient = KafkaZkClient(zkUrl, zkAcl, zkSessionTimeout, zkConnectionTimeout,
-      Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+      Int.MaxValue, Time.SYSTEM, zkClientConfig = zkClientConfig, name = "ZkSecurityMigrator")
     val enablePathCheck = opts.options.has(opts.enablePathCheckOpt)
     val migrator = new ZkSecurityMigrator(zkClient)
     migrator.run(enablePathCheck)
diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
index 5f701d8..88648fd 100644
--- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
@@ -95,25 +95,25 @@ object AclAuthorizer {
     }
   }
 
-  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
+  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
     val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
       map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
     if (!zkSslClientEnable)
-      None
+      new ZKClientConfig
     else {
       // start with the base config from the Kafka configuration
       // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
       val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
       // add in any prefixed overlays
-      KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => {
-        val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp)
-        if (prefixedValue.isDefined)
-          zkClientConfig.get.setProperty(sysProp,
+      KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) =>
+        configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
+          zkClientConfig.setProperty(sysProp,
             if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
-              (prefixedValue.get.toString.toUpperCase == "HTTPS").toString
+              (prefixedValue.toString.toUpperCase == "HTTPS").toString
             else
-              prefixedValue.get.toString)
-      }}
+              prefixedValue.toString)
+        }
+      }
       zkClientConfig
     }
   }
@@ -178,8 +178,8 @@ class AclAuthorizer extends Authorizer with Logging {
     // createChrootIfNecessary=true is necessary in case we are running in a KRaft cluster
     // because such a cluster will not create any chroot path in ZooKeeper (it doesn't connect to ZooKeeper)
     zkClient = KafkaZkClient(zkUrl, kafkaConfig.zkEnableSecureAcls, zkSessionTimeOutMs, zkConnectionTimeoutMs,
-      zkMaxInFlightRequests, time, "kafka.security", "AclAuthorizer", name=Some("ACL authorizer"),
-      zkClientConfig = zkClientConfig, createChrootIfNecessary = true)
+      zkMaxInFlightRequests, time, name = "ACL authorizer", zkClientConfig = zkClientConfig,
+      metricGroup = "kafka.security", metricType = "AclAuthorizer", createChrootIfNecessary = true)
     zkClient.createAclPaths()
 
     extendedAclSupport = kafkaConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a30d31d..c556d7a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -336,7 +336,7 @@ object KafkaConfig {
     ZkSslCrlEnableProp -> "zookeeper.ssl.crl",
     ZkSslOcspEnableProp -> "zookeeper.ssl.ocsp")
 
-  private[kafka] def getZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
+  private[kafka] def zooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String): Option[String] = {
     Option(clientConfig.getProperty(ZkSslConfigToSystemPropertyMap(kafkaPropName)))
   }
 
@@ -345,7 +345,7 @@ object KafkaConfig {
       kafkaPropName match {
         case ZkSslEndpointIdentificationAlgorithmProp => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
         case ZkSslEnabledProtocolsProp | ZkSslCipherSuitesProp => kafkaPropValue match {
-          case list: java.util.List[_] => list.asInstanceOf[java.util.List[_]].asScala.mkString(",")
+          case list: java.util.List[_] => list.asScala.mkString(",")
           case _ => kafkaPropValue.toString
         }
         case _ => kafkaPropValue.toString
@@ -354,10 +354,10 @@ object KafkaConfig {
 
   // For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
   // with both a client connection socket and a key store location explicitly set.
-  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
-    getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
-      getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
-      getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
+  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
+    zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).contains("true") &&
+      zooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
+      zooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
   }
 
   /** ********* General Configuration ***********/
@@ -1440,7 +1440,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     // Need to translate any system property value from true/false (String) to true/false (Boolean)
     val actuallyProvided = originals.containsKey(propKey)
     if (actuallyProvided) getBoolean(propKey) else {
-      val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
+      val sysPropValue = KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
       sysPropValue match {
         case Some("true") => true
         case Some(_) => false
@@ -1453,35 +1453,27 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     // Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
     val actuallyProvided = originals.containsKey(propKey)
     if (actuallyProvided) getString(propKey) else {
-      val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
-      sysPropValue match {
-        case Some(_) => sysPropValue.get
+      KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) match {
+        case Some(v) => v
         case _ => getString(propKey) // not specified so use the default value
       }
     }
   }
 
   private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = {
-    Option(getString(propKey)) match {
-      case config: Some[String] => config
-      case _ => KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
+    Option(getString(propKey)).orElse {
+      KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
     }
   }
   private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = {
-    Option(getPassword(propKey)) match {
-      case config: Some[Password] => config
-      case _ => {
-        val sysProp = KafkaConfig.getZooKeeperClientProperty (zkClientConfigViaSystemProperties, propKey)
-        if (sysProp.isDefined) Some (new Password (sysProp.get) ) else None
-      }
+    Option(getPassword(propKey)).orElse {
+      KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map(new Password(_))
     }
   }
   private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = {
-    Option(getList(propKey)) match {
-      case config: Some[util.List[String]] => config
-      case _ => {
-        val sysProp = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
-        if (sysProp.isDefined) Some(sysProp.get.split("\\s*,\\s*").toList.asJava) else None
+    Option(getList(propKey)).orElse {
+      KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map { sysProp =>
+        sysProp.split("\\s*,\\s*").toBuffer.asJava
       }
     }
   }
@@ -1502,12 +1494,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     // Need to translate any system property value from true/false to HTTPS/<blank>
     val kafkaProp = KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp
     val actuallyProvided = originals.containsKey(kafkaProp)
-    if (actuallyProvided) getString(kafkaProp) else {
-      val sysPropValue = KafkaConfig.getZooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp)
-      sysPropValue match {
+    if (actuallyProvided)
+      getString(kafkaProp)
+    else {
+      KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) match {
         case Some("true") => "HTTPS"
         case Some(_) => ""
-        case _ => getString(kafkaProp) // not specified so use the default value
+        case None => getString(kafkaProp) // not specified so use the default value
       }
     }
   }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index b3c66a6..c22eab2 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -56,11 +56,9 @@ import scala.jdk.CollectionConverters._
 
 object KafkaServer {
 
-  def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false) =
-    if (!config.zkSslClientEnable && !forceZkSslClientEnable)
-      None
-    else {
-      val clientConfig = new ZKClientConfig()
+  def zkClientConfigFromKafkaConfig(config: KafkaConfig, forceZkSslClientEnable: Boolean = false): ZKClientConfig = {
+    val clientConfig = new ZKClientConfig
+    if (config.zkSslClientEnable || forceZkSslClientEnable) {
       KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslClientEnableProp, "true")
       config.zkClientCnxnSocketClassName.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkClientCnxnSocketProp, _))
       config.zkSslKeyStoreLocation.foreach(KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslKeyStoreLocationProp, _))
@@ -75,8 +73,9 @@ object KafkaServer {
       KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp, config.ZkSslEndpointIdentificationAlgorithm)
       KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslCrlEnableProp, config.ZkSslCrlEnable.toString)
       KafkaConfig.setZooKeeperClientProperty(clientConfig, KafkaConfig.ZkSslOcspEnableProp, config.ZkSslOcspEnable.toString)
-      Some(clientConfig)
     }
+    clientConfig
+  }
 
   val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000
 }
@@ -144,7 +143,7 @@ class KafkaServer(
   var metadataCache: ZkMetadataCache = null
   var quotaManagers: QuotaFactory.QuotaManagers = null
 
-  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
+  val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config)
   private var _zkClient: KafkaZkClient = null
   private var configRepository: ZkConfigRepository = null
 
@@ -454,7 +453,7 @@ class KafkaServer(
         s"verification of the JAAS login file failed ${JaasUtils.zkSecuritySysConfigString}")
 
     _zkClient = KafkaZkClient(config.zkConnect, secureAclsEnabled, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
-      config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig),
+      config.zkMaxInFlightRequests, time, name = "Kafka server", zkClientConfig = zkClientConfig,
       createChrootIfNecessary = true)
     _zkClient.createTopLevelPaths()
   }
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index bcc89de9..823d6e8 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -17,7 +17,6 @@
 package kafka.zk
 
 import java.util.Properties
-
 import com.yammer.metrics.core.MetricName
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
@@ -38,6 +37,7 @@ import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.OpResult.{CreateResult, ErrorResult, SetDataResult}
 import org.apache.zookeeper.client.ZKClientConfig
+import org.apache.zookeeper.common.ZKConfig
 import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
 
@@ -1940,18 +1940,33 @@ object KafkaZkClient {
             connectionTimeoutMs: Int,
             maxInFlightRequests: Int,
             time: Time,
+            name: String,
+            zkClientConfig: ZKClientConfig,
             metricGroup: String = "kafka.server",
             metricType: String = "SessionExpireListener",
-            name: Option[String] = None,
-            zkClientConfig: Option[ZKClientConfig] = None,
             createChrootIfNecessary: Boolean = false
   ): KafkaZkClient = {
+
+    /* ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB.
+     * This causes a regression if Kafka tries to retrieve a large amount of data across many
+     * znodes – in such a case the ZooKeeper client will repeatedly emit a message of the form
+     * "java.io.IOException: Packet len <####> is out of range".
+     *
+     * We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
+     * auto configures itself if certain system properties have been set).
+     *
+     * See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
+     * changed in 3.6.0.
+     */
+    if (zkClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER) == null)
+      zkClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, ((4096 * 1024).toString))
+
     if (createChrootIfNecessary) {
       val chrootIndex = connectString.indexOf("/")
       if (chrootIndex > 0) {
         val zkConnWithoutChrootForChrootCreation = connectString.substring(0, chrootIndex)
-        val zkClientForChrootCreation = KafkaZkClient(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
-          connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, name, zkClientConfig)
+        val zkClientForChrootCreation = apply(zkConnWithoutChrootForChrootCreation, isSecure, sessionTimeoutMs,
+          connectionTimeoutMs, maxInFlightRequests, time, name, zkClientConfig, metricGroup, metricType)
         try {
           val chroot = connectString.substring(chrootIndex)
           if (!zkClientForChrootCreation.pathExists(chroot)) {
@@ -1963,7 +1978,7 @@ object KafkaZkClient {
       }
     }
     val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
-      time, metricGroup, metricType, name, zkClientConfig)
+      time, metricGroup, metricType, zkClientConfig, name)
     new KafkaZkClient(zooKeeperClient, isSecure, time)
   }
 
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 96ef6d5..091b401 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -61,24 +61,10 @@ class ZooKeeperClient(connectString: String,
                       time: Time,
                       metricGroup: String,
                       metricType: String,
-                      name: Option[String],
-                      zkClientConfig: Option[ZKClientConfig]) extends Logging with KafkaMetricsGroup {
-
-  def this(connectString: String,
-           sessionTimeoutMs: Int,
-           connectionTimeoutMs: Int,
-           maxInFlightRequests: Int,
-           time: Time,
-           metricGroup: String,
-           metricType: String) = {
-    this(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType, None,
-      None)
-  }
+                      private[zookeeper] val clientConfig: ZKClientConfig,
+                      name: String) extends Logging with KafkaMetricsGroup {
 
-  this.logIdent = name match {
-    case Some(n) => s"[ZooKeeperClient $n] "
-    case _ => "[ZooKeeperClient] "
-  }
+  this.logIdent = s"[ZooKeeperClient $name] "
   private val initializationLock = new ReentrantReadWriteLock()
   private val isConnectedOrExpiredLock = new ReentrantLock()
   private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
@@ -109,13 +95,10 @@ class ZooKeeperClient(connectString: String,
     }
   }
 
-  private val clientConfig = zkClientConfig getOrElse new ZKClientConfig()
-
   info(s"Initializing a new session to $connectString.")
   // Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
   @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
     clientConfig)
-  private[zookeeper] def getClientConfig = clientConfig
 
   newGauge("SessionState", () => connectionState.toString)
 
@@ -436,7 +419,7 @@ class ZooKeeperClient(connectString: String,
     }, delayMs, period = -1L, unit = TimeUnit.MILLISECONDS)
   }
 
-  private def threadPrefix: String = name.map(n => n.replaceAll("\\s", "") + "-").getOrElse("")
+  private def threadPrefix: String = name.replaceAll("\\s", "") + "-"
 
   // package level visibility for testing only
   private[zookeeper] object ZooKeeperClientWatcher extends Watcher {
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 9237011..d613b72 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -195,7 +195,7 @@ trait SaslSetup {
     val zkClientConfig = new ZKClientConfig()
     val zkClient = KafkaZkClient(
       zkConnect, JaasUtils.isZkSaslEnabled || KafkaConfig.zkTlsClientAuthEnabled(zkClientConfig), 30000, 30000,
-      Int.MaxValue, Time.SYSTEM, zkClientConfig = Some(zkClientConfig))
+      Int.MaxValue, Time.SYSTEM, name = "SaslSetup", zkClientConfig = zkClientConfig)
     val adminZkClient = new AdminZkClient(zkClient)
 
     val entityType = ConfigType.User
diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
index 69105c3..74803ce 100644
--- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala
@@ -18,7 +18,6 @@
 package kafka.security.auth
 
 import java.nio.charset.StandardCharsets
-
 import kafka.admin.ZkSecurityMigrator
 import kafka.utils.{Logging, TestUtils}
 import kafka.zk._
@@ -36,6 +35,7 @@ import kafka.controller.ReplicaAssignment
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.client.ZKClientConfig
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -137,13 +137,17 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
     BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
     (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
 
+  private def newKafkaZkClient(connectionString: String, isSecure: Boolean) =
+    KafkaZkClient(connectionString, isSecure, 6000, 6000, Int.MaxValue, Time.SYSTEM, "ZkAuthorizationTest",
+      new ZKClientConfig)
+
   /**
    * Tests the migration tool when making an unsecure
    * cluster secure.
    */
   @Test
   def testZkMigration(): Unit = {
-    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+    val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
     try {
       testMigration(zkConnect, unsecureZkClient, zkClient)
     } finally {
@@ -157,7 +161,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    */
   @Test
   def testZkAntiMigration(): Unit = {
-    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+    val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
     try {
       testMigration(zkConnect, zkClient, unsecureZkClient)
     } finally {
@@ -198,8 +202,8 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
   def testChroot(): Unit = {
     val zkUrl = zkConnect + "/kafka"
     zkClient.createRecursive("/kafka")
-    val unsecureZkClient = KafkaZkClient(zkUrl, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
-    val secureZkClient = KafkaZkClient(zkUrl, true, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+    val unsecureZkClient = newKafkaZkClient(zkUrl, isSecure = false)
+    val secureZkClient = newKafkaZkClient(zkUrl, isSecure = true)
     try {
       testMigration(zkUrl, unsecureZkClient, secureZkClient)
     } finally {
@@ -284,7 +288,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging {
    */
   private def deleteAllUnsecure(): Unit = {
     System.setProperty(JaasUtils.ZK_SASL_CLIENT, "false")
-    val unsecureZkClient = KafkaZkClient(zkConnect, false, 6000, 6000, Int.MaxValue, Time.SYSTEM)
+    val unsecureZkClient = newKafkaZkClient(zkConnect, isSecure = false)
     val result: Try[Boolean] = {
       deleteRecursive(unsecureZkClient, "/")
     }
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
index fa201db..a49b0d3 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerTest.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
 import java.util.{Collections, UUID}
 import java.util.concurrent.{Executors, Semaphore, TimeUnit}
-
 import kafka.Kafka
 import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
 import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
@@ -43,6 +42,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL, MATCH, PREFIXED}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.common.utils.{Time, SecurityUtils => JSecurityUtils}
+import org.apache.zookeeper.client.ZKClientConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
@@ -86,7 +86,7 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
     resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
 
     zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
-      Time.SYSTEM, "kafka.test", "AclAuthorizerTest")
+      Time.SYSTEM, "kafka.test", "AclAuthorizerTest", new ZKClientConfig, "AclAuthorizerTest")
   }
 
   @AfterEach
@@ -779,9 +779,12 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
   @Test
   def testAuthorizerNoZkConfig(): Unit = {
     val noTlsProps = Kafka.getPropsFromArgs(Array(prepareDefaultConfig))
-    assertEquals(None, AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
+    val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(noTlsProps),
-      mutable.Map(noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala.toSeq: _*)))
+      noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
+    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+      assertNull(zkClientConfig.getProperty(propName))
+    }
   }
 
   @Test
@@ -799,20 +802,19 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
       KafkaConfig.ZkSslTrustStoreTypeProp -> kafkaValue,
       KafkaConfig.ZkSslEnabledProtocolsProp -> kafkaValue,
       KafkaConfig.ZkSslCipherSuitesProp -> kafkaValue)
-    configs.foreach{case (key, value) => props.put(key, value.toString) }
+    configs.foreach { case (key, value) => props.put(key, value) }
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
-    assertTrue(zkClientConfig.isDefined)
     // confirm we get all the values we expect
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
       case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
-        assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+        assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
-        assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+        assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       case KafkaConfig.ZkSslProtocolProp =>
-        assertEquals("TLSv1.2", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
-      case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+        assertEquals("TLSv1.2", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
+      case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
     })
   }
 
@@ -839,14 +841,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
-    assertTrue(zkClientConfig.isDefined)
     // confirm we get all the values we expect
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
         case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
-          assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+          assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
         case KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
-          assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
-        case _ => assertEquals(kafkaValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+          assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
+        case _ => assertEquals(kafkaValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       })
   }
 
@@ -889,14 +890,13 @@ class AclAuthorizerTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
 
     val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
       KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
-    assertTrue(zkClientConfig.isDefined)
     // confirm we get all the values we expect
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(prop => prop match {
       case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp =>
-        assertEquals("true", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+        assertEquals("true", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
       case KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp =>
-        assertEquals("false", KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
-      case _ => assertEquals(prefixedValue, KafkaConfig.getZooKeeperClientProperty(zkClientConfig.get, prop).getOrElse("<None>"))
+        assertEquals("false", KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
+      case _ => assertEquals(prefixedValue, KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
     })
   }
 
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
index 86c7a14..bccd58a 100644
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
+++ b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerInterfaceDefaultTest.scala
@@ -18,7 +18,6 @@ package kafka.security.authorizer
 
 import java.util.concurrent.CompletionStage
 import java.{lang, util}
-
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
@@ -27,6 +26,7 @@ import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.authorizer._
+import org.apache.zookeeper.client.ZKClientConfig
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
@@ -49,7 +49,8 @@ class AuthorizerInterfaceDefaultTest extends ZooKeeperTestHarness with BaseAutho
     interfaceDefaultAuthorizer.authorizer.configure(config.originals)
 
     zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
-      Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest")
+      Time.SYSTEM, "kafka.test", "AuthorizerInterfaceDefaultTest", new ZKClientConfig,
+      "AuthorizerInterfaceDefaultTest")
   }
 
   @AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 35f3273..8056e23 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -20,8 +20,7 @@ package kafka.server
 import kafka.api.ApiVersion
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
-import org.apache.zookeeper.client.ZKClientConfig
-import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, fail}
 import org.junit.jupiter.api.Test
 
 import java.util.Properties
@@ -47,7 +46,10 @@ class KafkaServerTest extends ZooKeeperTestHarness {
     val props = new Properties
     props.put(KafkaConfig.ZkConnectProp, zkConnect) // required, otherwise we would leave it out
     props.put(KafkaConfig.ZkSslClientEnableProp, "false")
-    assertEquals(None, KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props)))
+    val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
+    KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach { propName =>
+      assertNull(zkClientConfig.getProperty(propName))
+    }
   }
 
   @Test
@@ -62,7 +64,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
       case _ => someValue
     }
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
-    val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
+    val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
     // now check to make sure the values were set correctly
     def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
       case KafkaConfig.ZkSslClientEnableProp | KafkaConfig.ZkSslCrlEnableProp | KafkaConfig.ZkSslOcspEnableProp => "true"
@@ -70,7 +72,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
       case _ => someValue
     }
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
-      assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+      assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
   }
 
   @Test
@@ -87,7 +89,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
       case _ => someValue
     }
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp => props.put(kafkaProp, kafkaConfigValueToSet(kafkaProp)))
-    val zkClientConfig: Option[ZKClientConfig] = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
+    val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(KafkaConfig.fromProps(props))
     // now check to make sure the values were set correctly
     def zkClientValueToExpect(kafkaProp: String) : String = kafkaProp match {
       case KafkaConfig.ZkSslClientEnableProp => "true"
@@ -97,7 +99,7 @@ class KafkaServerTest extends ZooKeeperTestHarness {
       case _ => someValue
     }
     KafkaConfig.ZkSslConfigToSystemPropertyMap.keys.foreach(kafkaProp =>
-      assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.get.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
+      assertEquals(zkClientValueToExpect(kafkaProp), zkClientConfig.getProperty(KafkaConfig.ZkSslConfigToSystemPropertyMap(kafkaProp))))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 0c1d860..2088e5f 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -51,6 +51,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.ZooDefs
 import org.apache.zookeeper.client.ZKClientConfig
+import org.apache.zookeeper.common.ZKConfig
 import org.apache.zookeeper.data.Stat
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
@@ -76,7 +77,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     super.setUp()
     zkClient.createControllerEpochRaw(1)
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
+      zkClientConfig = new ZKClientConfig)
     expiredSessionZkClient = ExpiredKafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled),
       zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
@@ -103,15 +105,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
     KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
     val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig))
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient", zkClientConfig = clientConfig)
     try {
-      assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey))
+      assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.currentZooKeeper.getClientConfig, propKey))
       // For a sanity check, make sure a bad client connection socket class name generates an exception
       val badClientConfig = new ZKClientConfig()
       KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName")
       assertThrows(classOf[Exception],
         () => KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-          zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(badClientConfig)))
+          zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest", zkClientConfig = badClientConfig))
     } finally {
       client.close()
     }
@@ -121,9 +123,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   @ValueSource(booleans = Array(true, false))
   def testChroot(createChrootIfNecessary: Boolean): Unit = {
     val chroot = "/chroot"
-    val clientConfig = new ZKClientConfig()
     val client = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, zkClientConfig = Some(clientConfig), createChrootIfNecessary = createChrootIfNecessary)
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest",
+      zkClientConfig = new ZKClientConfig, createChrootIfNecessary = createChrootIfNecessary)
     try {
       client.createTopLevelPaths()
       if (!createChrootIfNecessary) {
@@ -158,7 +160,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // this client doesn't have create permission to the root and chroot, but the chroot already exists
     // Expect that no exception thrown
     val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary = true)
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClientTest",
+      zkClientConfig = new ZKClientConfig, createChrootIfNecessary = true)
     chrootClient.close()
   }
 
@@ -1340,6 +1343,33 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath))
   }
 
+  @Test
+  def testJuteMaxBufffer(): Unit = {
+
+    def assertJuteMaxBufferConfig(clientConfig: ZKClientConfig, expectedValue: String): Unit = {
+      val client = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
+        zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
+        zkClientConfig = clientConfig)
+      try assertEquals(expectedValue, client.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+      finally client.close()
+    }
+
+    // default case
+    assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+
+    // Value set directly on ZKClientConfig takes precedence over system property
+    System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString)
+    try {
+      val clientConfig1 = new ZKClientConfig
+      clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 1024).toString)
+      assertJuteMaxBufferConfig(clientConfig1, expectedValue = "2048000")
+
+      // System property value is used if value is not set in ZKClientConfig
+      assertJuteMaxBufferConfig(new ZKClientConfig, expectedValue = "3072000")
+
+    } finally System.clearProperty(ZKConfig.JUTE_MAXBUFFER)
+  }
+
   class ExpiredKafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean, time: Time)
     extends KafkaZkClient(zooKeeperClient, isSecure, time) {
     // Overwriting this method from the parent class to force the client to re-register the Broker.
@@ -1365,7 +1395,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
               metricGroup: String = "kafka.server",
               metricType: String = "SessionExpireListener") = {
       val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
-        time, metricGroup, metricType)
+        time, metricGroup, metricType, new ZKClientConfig, "ExpiredKafkaZkClient")
       new ExpiredKafkaZkClient(zooKeeperClient, isSecure, time)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 6bc9d3a..8b61c0e 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,7 +19,7 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 import kafka.utils.{CoreUtils, Logging, TestUtils}
-import org.junit.jupiter.api.{AfterEach, AfterAll, BeforeEach, BeforeAll, Tag}
+import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
 import org.junit.jupiter.api.Assertions._
 import org.apache.kafka.common.security.JaasUtils
 
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
 import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 
 @Tag("integration")
@@ -53,7 +54,7 @@ abstract class ZooKeeperTestHarness extends Logging {
   def setUp(): Unit = {
     zookeeper = new EmbeddedZookeeper()
     zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "ZooKeeperTestHarness", new ZKClientConfig)
     adminZkClient = new AdminZkClient(zkClient)
   }
 
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index 0392386..a0eb1ea 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -49,8 +49,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach")
     cleanMetricsRegistry()
     super.setUp()
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
-      Time.SYSTEM, "testMetricGroup", "testMetricType")
+    zooKeeperClient = newZooKeeperClient()
   }
 
   @AfterEach
@@ -65,8 +64,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testUnresolvableConnectString(): Unit = {
     try {
-      new ZooKeeperClient("some.invalid.hostname.foo.bar.local", zkSessionTimeout, connectionTimeoutMs = 10,
-        Int.MaxValue, time, "testMetricGroup", "testMetricType")
+      newZooKeeperClient("some.invalid.hostname.foo.bar.local", connectionTimeoutMs = 10)
     } catch {
       case e: ZooKeeperClientTimeoutException =>
         assertEquals(Set.empty, runningZkSendThreads,  "ZooKeeper client threads still running")
@@ -81,14 +79,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()
-    assertThrows(classOf[ZooKeeperClientTimeoutException], () => new ZooKeeperClient(zkConnect, zkSessionTimeout,
-      connectionTimeoutMs = 10, Int.MaxValue, time, "testMetricGroup", "testMetricType").close())
+    assertThrows(classOf[ZooKeeperClientTimeoutException], () => newZooKeeperClient(
+      connectionTimeoutMs = 10).close())
   }
 
   @Test
   def testConnection(): Unit = {
-    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
-      "testMetricType")
+    val client = newZooKeeperClient()
     try {
       // Verify ZooKeeper event thread name. This is used in ZooKeeperTestHarness to verify that tests have closed ZK clients
       val threads = Thread.getAllStackTraces.keySet.asScala.map(_.getName)
@@ -108,15 +105,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     val propKey = KafkaConfig.ZkClientCnxnSocketProp
     val propVal = "org.apache.zookeeper.ClientCnxnSocketNetty"
     KafkaConfig.setZooKeeperClientProperty(clientConfig, propKey, propVal)
-    val client = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time, "testMetricGroup",
-      "testMetricType", None, Some(clientConfig))
+    val client = newZooKeeperClient(clientConfig = clientConfig)
     try {
-      assertEquals(Some(propVal), KafkaConfig.getZooKeeperClientProperty(client.getClientConfig, propKey))
+      assertEquals(Some(propVal), KafkaConfig.zooKeeperClientProperty(client.clientConfig, propKey))
       // For a sanity check, make sure a bad client connection socket class name generates an exception
       val badClientConfig = new ZKClientConfig()
       KafkaConfig.setZooKeeperClientProperty(badClientConfig, propKey, propVal + "BadClassName")
-      assertThrows(classOf[Exception], () => new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout,
-        Int.MaxValue, time, "testMetricGroup", "testMetricType", None, Some(badClientConfig)))
+      assertThrows(classOf[Exception], () => newZooKeeperClient(clientConfig = badClientConfig))
     } finally {
       client.close()
     }
@@ -350,8 +345,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zooKeeperClient.close()
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
-      "testMetricGroup", "testMetricType")
+    zooKeeperClient = newZooKeeperClient()
     zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
 
     val requestThread = new Thread() {
@@ -399,8 +393,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zooKeeperClient.close()
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
-      "testMetricGroup", "testMetricType")
+    zooKeeperClient = newZooKeeperClient()
     zooKeeperClient.registerStateChangeHandler(faultyHandler)
     zooKeeperClient.registerStateChangeHandler(goodHandler)
 
@@ -476,8 +469,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       }
     }
 
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
-      "testMetricGroup", "testMetricType")
+    val zooKeeperClient = newZooKeeperClient()
     try {
       zooKeeperClient.registerStateChangeHandler(stateChangeHandler)
       zooKeeperClient.forceReinitialize()
@@ -489,8 +481,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testConnectionLossRequestTermination(): Unit = {
     val batchSize = 10
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, time,
-      "testGroupType", "testGroupName")
+    val zooKeeperClient = newZooKeeperClient(maxInFlight = 2)
     zookeeper.shutdown()
     try {
       val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
@@ -553,7 +544,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     @volatile var resultCodes: Seq[Code] = null
     val stateChanges = new ConcurrentLinkedQueue[String]()
     val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, maxInflightRequests,
-      time, "testGroupType", "testGroupName") {
+      time, "testGroupType", "testGroupName", new ZKClientConfig, "ZooKeeperClientTest") {
       override def send[Req <: AsyncRequest](request: Req)(processResponse: Req#Response => Unit): Unit = {
         super.send(request)( response => {
           responseExecutor.submit(new Runnable {
@@ -657,8 +648,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     }
 
     zooKeeperClient.close()
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, time,
-      "testMetricGroup", "testMetricType")
+    zooKeeperClient = newZooKeeperClient()
     zooKeeperClient.registerStateChangeHandler(changeHandler)
 
     zooKeeperClient.ZooKeeperClientWatcher.process(new WatchedEvent(EventType.None, KeeperState.AuthFailed, null))
@@ -708,6 +698,13 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     assertEquals(States.CLOSED, zooKeeperClient.connectionState)
   }
 
+  private def newZooKeeperClient(connectionString: String = zkConnect,
+                                 connectionTimeoutMs: Int = zkConnectionTimeout,
+                                 maxInFlight: Int = zkMaxInFlightRequests,
+                                 clientConfig: ZKClientConfig = new ZKClientConfig) =
+    new ZooKeeperClient(connectionString, zkSessionTimeout, connectionTimeoutMs, maxInFlight, time,
+      "testMetricGroup", "testMetricType", clientConfig, "ZooKeeperClientTest")
+
   private def cleanMetricsRegistry(): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry
     metrics.allMetrics.keySet.forEach(metrics.removeMetric)