You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/04 00:01:01 UTC

[GitHub] [kafka] ijuma opened a new pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

ijuma opened a new pull request #11295:
URL: https://github.com/apache/kafka/pull/11295


   ZooKeeper 3.6.0 changed the default configuration for JUTE_MAXBUFFER from 4 MB to 1 MB.
   This causes a regression if Kafka tries to retrieve a large amount of data across many
   znodes – in such a case the ZooKeeper client will repeatedly emit a message of the form
   "java.io.IOException: Packet len <####> is out of range".
   
   We restore the 3.4.x/3.5.x behavior unless the caller has set the property (note that ZKConfig
   auto configures itself if certain system properties have been set).
   
   I added a unit test that fails without the change and passes with it.
    
   See https://github.com/apache/zookeeper/pull/1129 for the details on why the behavior
   changed in 3.6.0.
   
   Credit to @rondagostino for finding and reporting this issue.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma merged pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

Posted by GitBox <gi...@apache.org>.
ijuma merged pull request #11295:
URL: https://github.com/apache/kafka/pull/11295


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11295:
URL: https://github.com/apache/kafka/pull/11295#issuecomment-913681527


   @dajac Thanks for the review, I pushed a commit that addresses your feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #11295:
URL: https://github.com/apache/kafka/pull/11295#issuecomment-913389764


   @omkreddy if you have cycles, this is a 3.0 blocker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11295:
URL: https://github.com/apache/kafka/pull/11295#discussion_r702924352



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -354,10 +354,10 @@ object KafkaConfig {
 
   // For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
   // with both a client connection socket and a key store location explicitly set.
-  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
-    getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
-      getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
-      getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
+  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
+    zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).map(_ == "true").getOrElse(false) &&

Review comment:
       True, even better: `contains`. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11295:
URL: https://github.com/apache/kafka/pull/11295#discussion_r702694677



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -354,10 +354,10 @@ object KafkaConfig {
 
   // For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
   // with both a client connection socket and a key store location explicitly set.
-  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig) = {
-    getZooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).getOrElse("false") == "true" &&
-      getZooKeeperClientProperty(zkClientConfig, ZkClientCnxnSocketProp).isDefined &&
-      getZooKeeperClientProperty(zkClientConfig, ZkSslKeyStoreLocationProp).isDefined
+  private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
+    zooKeeperClientProperty(zkClientConfig, ZkSslClientEnableProp).map(_ == "true").getOrElse(false) &&

Review comment:
       nit: We could use `exists` here.

##########
File path: core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala
##########
@@ -95,24 +95,24 @@ object AclAuthorizer {
     }
   }
 
-  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): Option[ZKClientConfig] = {
+  private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
     val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
       map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
     if (!zkSslClientEnable)
-      None
+      new ZKClientConfig
     else {
       // start with the base config from the Kafka configuration
       // be sure to force creation since the zkSslClientEnable property in the kafkaConfig could be false
       val zkClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, true)
       // add in any prefixed overlays
-      KafkaConfig.ZkSslConfigToSystemPropertyMap.foreach{ case (kafkaProp, sysProp) => {
-        val prefixedValue = configMap.get(AclAuthorizer.configPrefix + kafkaProp)
-        if (prefixedValue.isDefined)
-          zkClientConfig.get.setProperty(sysProp,
+      KafkaConfig.ZkSslConfigToSystemPropertyMap.forKeyValue { (kafkaProp, sysProp) => {

Review comment:
       nit: We could remove the extra block defined by the last `{` on this line.

##########
File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
##########
@@ -1340,6 +1343,32 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath))
   }
 
+  @Test
+  def testJuteMaxBufffer(): Unit = {
+    // default case
+    assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+
+    // Value set directly on ZKClientConfig takes precedence over system property
+    System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString)
+    try {
+      val clientConfig1 = new ZKClientConfig
+      clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 1024).toString)
+      val client1 = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
+        zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
+        zkClientConfig = clientConfig1)
+      try assertEquals("2048000", client1.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+      finally client1.close()

Review comment:
       Is it worth defining an inner helper method to avoid the repeated code here? Something like: `assertPropery(config: ZKClientConfig, property: String, expectedValue: String)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #11295: KAFKA-13270: Set JUTE_MAXBUFFER to 4 MB by default

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #11295:
URL: https://github.com/apache/kafka/pull/11295#discussion_r702929731



##########
File path: core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
##########
@@ -1340,6 +1343,32 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(ZooDefs.Ids.READ_ACL_UNSAFE.asScala, zkClient.getAcl(mockPath))
   }
 
+  @Test
+  def testJuteMaxBufffer(): Unit = {
+    // default case
+    assertEquals("4194304", zkClient.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+
+    // Value set directly on ZKClientConfig takes precedence over system property
+    System.setProperty(ZKConfig.JUTE_MAXBUFFER, (3000 * 1024).toString)
+    try {
+      val clientConfig1 = new ZKClientConfig
+      clientConfig1.setProperty(ZKConfig.JUTE_MAXBUFFER, (2000 * 1024).toString)
+      val client1 = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
+        zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, name = "KafkaZkClient",
+        zkClientConfig = clientConfig1)
+      try assertEquals("2048000", client1.currentZooKeeper.getClientConfig.getProperty(ZKConfig.JUTE_MAXBUFFER))
+      finally client1.close()

Review comment:
       That's a fair suggestion, I pushed a change along these lines.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org