You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/09 08:21:08 UTC

[GitHub] [kafka] ijuma commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

ijuma commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r628854509



##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
     extends MetadataSupport {
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {
+    if (!hasZkAuthorizer) {
+      throw createException
     }
   }
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
 
   override def maybeForward(request: RequestChannel.Request,
                             handler: RequestChannel.Request => Unit,
                             responseCallback: Option[AbstractResponse] => Unit): Unit = {
     if (!request.isForwarded) {
-      fwdMgr.forwardRequest(request, responseCallback)
+      request.header.apiKey match {
+        case ApiKeys.CREATE_ACLS | ApiKeys.DELETE_ACLS =>
+          if (hasZkAuthorizer) {
+            handler(request)

Review comment:
       Why don't we forward in this case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org