You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/08 03:45:25 UTC

[GitHub] [kafka] showuon opened a new pull request #10838: MINOR: Refactor DynamicConfigManager

showuon opened a new pull request #10838:
URL: https://github.com/apache/kafka/pull/10838


   Some refactor to DynamicConfigManager class
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r651801212



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       You're right. Will do. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#issuecomment-861238094


   @fpj  @Parth-Brahmbhatt @ijuma , please help review this small change PR. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r660557813



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       > What is the user expected to do with this information?
   I think you're right. This change is unnecessary.
   
   > Also, when would we get a "not json" parameter?
   Actually, I'm not pretty sure. I just get it from the comment and the test: 
   `non-json is from the deprecated TopicConfigManager`. 
   
   I'll revert this change. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r660562587



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       Default arguments are dangerous for things like this, I would remove them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#issuecomment-869655003


   @ijuma , please help review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: Refactor DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r647088680



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       We should pass the `changeExpirationMs` and `time` into `ZkNodeChangeNotificationListener`, otherwise, it'll always use default value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r660557813



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       > What is the user expected to do with this information?
   I think you're right. This change is unnecessary.
   
   > Also, when would we get a "not json" parameter?
   Actually, I'm not pretty sure. I just get it from the comment and the test: 
   `non-json is from the deprecated TopicConfigManager`. 
   
   I'll revert this change. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r660645680



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       Agree! Removed. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#issuecomment-871028414


   Failed tests are unrelated. Thank you.
   ```
       Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
       Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
       Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
       Build / JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: Refactor DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r647088430



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -71,9 +71,11 @@ object ConfigEntityName {
  *
  * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications.
  * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds
- * it checks if this notification is larger than a static expiration time (say 10mins) and if so it deletes this notification.
+ * it checks if this notification is larger than a static expiration time (default to 15 mins) and if so it deletes this notification.
  * For any new changes it reads the new configuration, combines it with the defaults, and updates the existing config.
  *
+ * See [[kafka.common.ZkNodeChangeNotificationListener]] for details

Review comment:
       In https://github.com/apache/kafka/pull/679, we make DynamicConfigManager to use the ZkNodeChangeNotificationListener. Add a link here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r659764053



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       What is the user expected to do with this information? Also, when would we get a "not json" parameter?

##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       Default arguments are dangerous for things like this, I would remove them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r654484159



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       Why are we doing this?

##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       Also, shall we change the code so that we are forced to pass this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r654720550



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       So far, we only use dynamicConficManager during broker started. So, we can keep as before and use default one for sure. But pass these parameters to make is more flexible is not bad, either. I don't have any preference here. What do you think?
   Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r651760535



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       Can we please add a test?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#issuecomment-864014850


   Failed tests are unrelated. Thanks.
   ```
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#issuecomment-869655003


   @ijuma , please help review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r659764053



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       What is the user expected to do with this information? Also, when would we get a "not json" parameter?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r654720550



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -154,7 +156,7 @@ class DynamicConfigManager(private val zkClient: KafkaZkClient,
   }
 
   private val configChangeListener = new ZkNodeChangeNotificationListener(zkClient, ConfigEntityChangeNotificationZNode.path,
-    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler)
+    ConfigEntityChangeNotificationSequenceZNode.SequenceNumberPrefix, ConfigChangedNotificationHandler, changeExpirationMs, time)

Review comment:
       So far, we only use dynamicConficManager during broker started. So, we can keep as before and use default one for sure. But pass there's parameters make is more flexible is not bad. I don't have any preference here. What do you think?
   Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10838: MINOR: small fix and clean up for DynamicConfigManager

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10838:
URL: https://github.com/apache/kafka/pull/10838#discussion_r654719694



##########
File path: core/src/main/scala/kafka/server/DynamicConfigManager.scala
##########
@@ -86,25 +88,30 @@ object ConfigEntityName {
  */
 class DynamicConfigManager(private val zkClient: KafkaZkClient,
                            private val configHandlers: Map[String, ConfigHandler],
-                           private val changeExpirationMs: Long = 15*60*1000,
+                           private val changeExpirationMs: Long = 15 * 60 * 1000,
                            private val time: Time = Time.SYSTEM) extends Logging {
   val adminZkClient = new AdminZkClient(zkClient)
 
   object ConfigChangedNotificationHandler extends NotificationHandler {
     override def processNotification(jsonBytes: Array[Byte]) = {
-      // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
-      Json.parseBytes(jsonBytes).foreach { js =>
-        val jsObject = js.asJsonObjectOption.getOrElse {
-          throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
-            """{"version" : 1, "entity_type":"topics/clients", "entity_name" : "topic_name/client_id"} or """ +
-            """{"version" : 2, "entity_path":"entity_type/entity_name"}. """ +
-            s"Received: ${new String(jsonBytes, StandardCharsets.UTF_8)}")
-        }
-        jsObject("version").to[Int] match {
-          case 1 => processEntityConfigChangeVersion1(jsonBytes, jsObject)
-          case 2 => processEntityConfigChangeVersion2(jsonBytes, jsObject)
-          case version => throw new IllegalArgumentException("Config change notification has unsupported version " +
-            s"'$version', supported versions are 1 and 2.")
+      val jsonValue = Json.parseBytes(jsonBytes)
+      if (jsonValue.isEmpty) {
+        // Ignore non-json notifications because they can be from the deprecated TopicConfigManager
+        warn(s"The non-json notifications are ignored.")

Review comment:
       Before my change, when there's a "not json" parameter passed, we just ignore it without doing nothing. I'm thinking we should at least log to let user know. Maybe we can log in INFO? What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org