You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/29 17:57:29 UTC

[GitHub] [kafka] dielhennr commented on a change in pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r679367945



##########
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##########
@@ -22,20 +22,142 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
+import org.apache.kafka.server.policy.AlterConfigPolicy
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging {
 
+  val alterConfigPolicy =
+    Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))

Review comment:
       Not sure how to handle the `alterConfigPolicy` in KRaft. In the Zk case this is kept in `ZkAdminManager` which closes `alterConfigPolicy` when `KafkaServer` calls `adminManager.shutdown()`. Since KRaft does not have this `adminManager` I'm not sure where the `alterConfigPolicy` should be closed. @cmccabe Any thoughts about this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org