You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/17 00:28:20 UTC

[GitHub] [kafka] rondagostino opened a new pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

rondagostino opened a new pull request #10550:
URL: https://github.com/apache/kafka/pull/10550


   This patch adds support for running the ZooKeeper-based `kafka.security.authorizer.AclAuthorizer` with KRaft clusters.  Set the `authorizer.class.name` config as well as the `zookeeper.connect` config while also setting the typical KRaft configs (`node.id`, `process.roles`, etc.), and the cluster will use KRaft for metadata and ZooKeeper for ACL storage.  A system test that exercises the authorizer is included.
   
   This patch also changes "Raft" to "KRaft" in several system test files.  It also fixes a bug where system test admin clients were unable to connect to a cluster with broker credentials via the SSL security protocol when the broker was using that for inter-broker communication and SASL for client communication.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-843228313


   @cmccabe I merged in trunk and pushed a separate commit to fix the failing unit tests.  The way we test to ensure an API is always forwarded for KRaft is to send the request directly through into `KafkaApis` as though it wasn't forwarded and assert that we get `UnsupportedVersionException`.  You had inadvertently short-circuited our ability to send the request through to `KafkaApis` by invoking the new `AclApis` directly in the `request.header.apiKey match {` block.  I fixed the tests by re-adding the standard `handle...()` methods and have them check the metadata support status if necessary and then invoke `AclApis` -- that way we still have the check that we are using ZooKeeper and not KRaft, and the tests can get the expected exception and pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-839281487


   Thanks for working on this, @rondagostino , and sorry about the delays in reviewing.
   
   > We forward the Create/Remove operations to the controller, but this patch actually short-circuits that if we are using KRaft with the ZooKeeper-based `AclAuthorizer` via the changes to `RaftSupport.maybeForward()`. The reason for short-circuiting it is because the KRaft controller doesn't have the code to create or remove ACLs (`handle{Create,Delete}Acls` in `KafkaApis`). We could add it, of course, in which case the changes to the `maybeForward()` method would be unnecessary. Perhaps it would be simpler to do that instead of delaying it to an additional PR -- is that what you were suggesting?
   
   Yes, I think we should just do this in `ControllerApis.scala` and be done with it.  It's kind of annoying to do in a follow-on PR since we'd have to add a lot of special-case code which we'd later have to undo, which is not usually the right way to go.  We might even be able to move this code into `RequestHandlerHelper` or something since it would be the same between `BrokerApis` and `ControllerApis` (I think?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r628854509



##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
     extends MetadataSupport {
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {
+    if (!hasZkAuthorizer) {
+      throw createException
     }
   }
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
 
   override def maybeForward(request: RequestChannel.Request,
                             handler: RequestChannel.Request => Unit,
                             responseCallback: Option[AbstractResponse] => Unit): Unit = {
     if (!request.isForwarded) {
-      fwdMgr.forwardRequest(request, responseCallback)
+      request.header.apiKey match {
+        case ApiKeys.CREATE_ACLS | ApiKeys.DELETE_ACLS =>
+          if (hasZkAuthorizer) {
+            handler(request)

Review comment:
       Why don't we forward in this case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r628856179



##########
File path: tests/kafkatest/tests/core/zookeeper_authorizer_test.py
##########
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.security.kafka_acls import ACLs
+
+class ZooKeeperAuthorizerTest(Test):

Review comment:
       We have a few system tests that exercise acls. Can you please describe why we need a new one? Not saying we don't, but it's good to be clear with regards to the gap we're trying to fill.
   
   Also, we may want to document that we intend to replace the ZK authorizer with a KRaft one. When that happens, I assume we would have a new class for testing the latter and this would be changed to only test zk mode?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10550:
URL: https://github.com/apache/kafka/pull/10550


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630603137



##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
     extends MetadataSupport {
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {

Review comment:
       I don't think we need (or want) to special-case the ZK authorizer here.  There is a Confluent authorizer that doesn't depend on ZK, and also a Cloudera one.   We don't want to break them.  Just forward everything




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this.
   
   Please keep in mind that this is a plugin architecture.  The ZK-based authorizer shouldn't get any special treatment, nor should its setup be done outside its own code.  The code and logic for accessing ZooKeeper for ACLs needs to be contained just in the AclAuthorizer, not everywhere in the code.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to take an Authorizer object as an argument.  (Although it's not even clear to me that that is true, since it seems like all authorization happens in KafkaApis and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its `start` function.  So creating the relevant znodes, etc.  We probably need to continue doing that setup in KafkaServer.scala as well, since not all users will be using KafkaServer + AclAuthorizer (again, plugin architecture, they could use using one but not the other.)
   
   A separate issue is that we need to start forwarding the ACL operations to the controller.  You did one half of that work here (adding "controller" to the message JSONs) but not the other (supporting these calls on the controller-side).  If it's easier, we could probably do this in a follow-up JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-841855882


   @rondagostino did you see https://github.com/apache/kafka/pull/10699?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630601127



##########
File path: tests/kafkatest/services/security/kafka_acls.py
##########
@@ -66,17 +67,46 @@ def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, addit
                This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
         :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required
                to create SCRAM credentials and topics, respectively
+        :param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise
+                we use the the client security protocol unless inter-broker security protocol is PLAINTEXT, in which case we use PLAINTEXT.
+                Then we use the broker's credentials if the selected security protocol matches the inter-broker security protocol,
+                otherwise we use the client's credentials.
         """
         node = kafka.nodes[0]
 
         for operation in ['ClusterAction'] + additional_cluster_operations_to_grant:
             cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % {
-                'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection),
+                'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol),
                 'operation': operation,
                 'principal': principal
             }
             kafka.run_cli_tool(node, cmd)
 
+    def remove_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_remove = [], security_protocol=None):

Review comment:
       Since this is new code, it would be really good to avoid introducing `force_use_zk_connection` here if possible.  I can't see anywhere in this PR where it is used, is this really necessary?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-827040926


   > we need to start forwarding the ACL operations to the controller
   
   We forward the Create/Remove operations to the controller, BUT this patch actually short-circuits that if we are using KRaft with the ZooKeeper-based `AclAuthorizer` via the changes to `RaftSupport.maybeForward()`.  The reason for short-circuiting it is because the KRaft controller doesn't have the code to create or remove ACLs (`handle{Create,Delete}Acls` in `KafkaApis`).  We could add it, of course, in which case the changes to the `maybeForward()` method would be unnecessary.  Perhaps it would be simpler to do that instead of delaying it to an additional PR -- is that what you were suggesting?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633107934



##########
File path: clients/src/main/resources/common/message/DeleteAclsRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],

Review comment:
       As mentioned, I am wondering if this should be `listeners": ["zkBroker", "broker"],` since the KRaft broker never forward DescribeAcls to the controller.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this, @rondagostino !
   
   We have to be a bit careful about the structure here.  This is a plugin architecture, which means that the ZK-based authorizer shouldn't get any special treatment, nor should its setup be done outside its own code.  The code and logic for accessing ZooKeeper for ACLs needs to be contained just in the AclAuthorizer itself.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to take an Authorizer object as an argument.  (Although it's not even clear to me that that is true, since it seems like all authorization happens in KafkaApis and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its `start` function.  So creating the relevant znodes, etc.  We probably need to continue doing that setup in KafkaServer.scala as well, since not all users will be using KafkaServer + AclAuthorizer (again, plugin architecture, they could use using one but not the other.)  It might be helpful to move this into `KafkaZkClient` so that it doesn't need to be duplicated.  But it's not a lot of code in any case, as far as I can see.
   
   A separate issue is that we need to start forwarding the ACL operations to the controller.  You did one half of that work here (adding "controller" to the message JSONs) but not the other (supporting these calls on the controller-side).  If it's easier, we could probably do this in a follow-up JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633107724



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+        case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)

Review comment:
       When would the controller ever process `DescribeAcls`?  It seems to me that it never would because brokers don't forward that request.  And if that is correct, I think my change to `DescribeAclsRequest.json` to set `"listeners": ["zkBroker", "broker", "controller"],` should instead be setting it to `"listeners": ["zkBroker", "broker"],`.

##########
File path: clients/src/main/resources/common/message/DeleteAclsRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],

Review comment:
       As mentioned, I am wondering if this should be `listeners": ["zkBroker", "broker"],` since the KRaft broker never forward DescribeAcls to the controller.

##########
File path: tests/kafkatest/tests/core/zookeeper_authorizer_test.py
##########
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.security.kafka_acls import ACLs
+
+class ZooKeeperAuthorizerTest(Test):

Review comment:
       The only two system tests that leverage an authorizer are `ZooKeeperSecurityUpgradeTest` and `TestSecurityRollingUpgrade`.  The first is specific to ZooKeeper and does not apply to KRaft at all.  The second could be applied to KRaft because it tests changing the inter-broker security protocol across rolls rather than any broker version upgrade, but it has not yet been converted to the KRaft case, plus it does not remove ACLs (which we also want to test here).  Also, the authorizer piece at the end is not really the main part of that test.  It felt appropriate to create this simple, straightforward test for the specific use of the ZooKeeper-based authorizer.
   
   I've added a comment to this test (assuming we keep it) stating that we will test the KRaft replacement authorizer separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-827032633


   > It might be helpful to move the znode setup code into KafkaZkClient
   
   @cmccabe One possibility is that the first time we connect to ZooKeeper we connect without the chroot path, create that path if it doesn't exist, and then disconnect -- then (and thereafter)we connect with the chroot path?  If we did that then I think it would have to happen in `kafka.zookeeper.ZooKeeperClient` rather than in `KafkaZkClient` since the former is where we provide the chroot path -- the latter seems to simply use the former with no concept of a chroot path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this, @rondagostino !
   
   We have to be a bit careful about the structure here.  This is a plugin architecture, which means that the ZK-based authorizer shouldn't get any special treatment, nor should its setup be done outside its own code.  The code and logic for accessing ZooKeeper for ACLs needs to be contained just in the AclAuthorizer itself.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to take an Authorizer object as an argument.  (Although it's not even clear to me that that is true, since it seems like all authorization happens in KafkaApis and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its `start` function.  So creating the relevant znodes, etc.  We probably need to continue doing that setup in KafkaServer.scala as well, since not all users will be using KafkaServer + AclAuthorizer (again, plugin architecture, they could use using one but not the other.)
   
   A separate issue is that we need to start forwarding the ACL operations to the controller.  You did one half of that work here (adding "controller" to the message JSONs) but not the other (supporting these calls on the controller-side).  If it's easier, we could probably do this in a follow-up JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633107724



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+        case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)

Review comment:
       When would the controller ever process `DescribeAcls`?  It seems to me that it never would because brokers don't forward that request.  And if that is correct, I think my change to `DescribeAclsRequest.json` to set `"listeners": ["zkBroker", "broker", "controller"],` should instead be setting it to `"listeners": ["zkBroker", "broker"],`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633110706



##########
File path: tests/kafkatest/tests/core/zookeeper_authorizer_test.py
##########
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.cluster.remoteaccount import RemoteCommandError
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.tests.test import Test
+
+from kafkatest.services.kafka import KafkaService, quorum
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.security.kafka_acls import ACLs
+
+class ZooKeeperAuthorizerTest(Test):

Review comment:
       The only two system tests that leverage an authorizer are `ZooKeeperSecurityUpgradeTest` and `TestSecurityRollingUpgrade`.  The first is specific to ZooKeeper and does not apply to KRaft at all.  The second could be applied to KRaft because it tests changing the inter-broker security protocol across rolls rather than any broker version upgrade, but it has not yet been converted to the KRaft case, plus it does not remove ACLs (which we also want to test here).  Also, the authorizer piece at the end is not really the main part of that test.  It felt appropriate to create this simple, straightforward test for the specific use of the ZooKeeper-based authorizer.
   
   I've added a comment to this test (assuming we keep it) stating that we will test the KRaft replacement authorizer separately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630598753



##########
File path: tests/kafkatest/services/kafka/kafka.py
##########
@@ -863,7 +878,7 @@ def kafka_configs_cmd_with_optional_security_settings(self, node, force_use_zk_c
                 # configure JAAS to provide the typical client credentials
                 jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
                 use_inter_broker_mechanism_for_client = False
-            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop, KafkaService.KRB5_CONF)
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " % (jaas_conf_prop, KafkaService.KRB5_CONF) if security_protocol_to_use != "SSL" else ""

Review comment:
       I'm confused by the logic here.  If we have security_protocol==SSL then we do not define the jaas properties in KAFKA_OPTS?  Seems a bit weird -- why define this when we're using PLAINTEXT or when we're using SASL_SSL, but not when using SSL?  Can you add a comment about how this works?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] webfrank commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
webfrank commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-1036433573


   Hi, know this is not the right place but I didn't find any sample configuration to use AclAuthorizer with KRaft enabled. If I add the line `authorizer.class.name=kafka.security.authorizer.AclAuthorizer` the server crashes. I read I have to configure zookeeper.connect property but of course I don't have zookeeper running. Any hint is really appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-841855882


   @rondagostino did you see https://github.com/apache/kafka/pull/10699?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r630602650



##########
File path: core/src/main/scala/kafka/server/MetadataSupport.scala
##########
@@ -91,28 +93,42 @@ case class ZkSupport(adminManager: ZkAdminManager,
   override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache)
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache, quotaCache: ClientQuotaCache, config: KafkaConfig)
     extends MetadataSupport {
+  if (config.requiresZookeeper) {
+    throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  }
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
-  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
-
-  override def ensureConsistentWith(config: KafkaConfig): Unit = {
-    if (config.requiresZookeeper) {
-      throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft")
+  override def requireZkAuthorizerOrThrow(createException: => Exception) = {
+    if (!hasZkAuthorizer) {
+      throw createException
     }
   }
+  override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
 
   override def maybeForward(request: RequestChannel.Request,
                             handler: RequestChannel.Request => Unit,
                             responseCallback: Option[AbstractResponse] => Unit): Unit = {
     if (!request.isForwarded) {
-      fwdMgr.forwardRequest(request, responseCallback)
+      request.header.apiKey match {
+        case ApiKeys.CREATE_ACLS | ApiKeys.DELETE_ACLS =>
+          if (hasZkAuthorizer) {
+            handler(request)

Review comment:
       Yeah, I had the same question.  It seems like the code is already in place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#discussion_r633749112



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -99,6 +100,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
         case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
+        case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)

Review comment:
       We might want the controller to process DescribeAcls for debug purposes.  There's no reason to artificially disable it from processing the RPC, although I agree that it will normally not be used.

##########
File path: clients/src/main/resources/common/message/DeleteAclsRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 31,
   "type": "request",
-  "listeners": ["zkBroker"],
+  "listeners": ["zkBroker", "broker", "controller"],

Review comment:
       (See above comment)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
cmccabe edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-822694203


   Thanks for tackling this, @rondagostino !
   
   We have to be a bit careful about the structure here.  This is a plugin architecture, which means that the ZK-based authorizer shouldn't get any special treatment, nor should its setup be done outside its own code.  The code and logic for accessing ZooKeeper for ACLs needs to be contained just in the AclAuthorizer itself.
   
   For example, KafkaRaftServer does not need to know anything about ZooKeeper or AclAuthorizer specifically.  Maybe KafkaRaftServer's constructor needs to take an Authorizer object as an argument.  (Although it's not even clear to me that that is true, since it seems like all authorization happens in KafkaApis and ControllerApis.)  But certainly KafkaRaftServer does not need to be looking at `config.zkConnect` or messing around with ZkClient.
   
   Whatever setup the zookeeper authorizer needs to do should be done in its `start` function.  So creating the relevant znodes, etc.  We probably need to continue doing that setup in KafkaServer.scala as well, since not all users will be using KafkaServer + AclAuthorizer (again, plugin architecture, they could use using one but not the other.)  
   
   For example, someone might be using ZK mode (KafkaServer.scala) but using a non-ZK authorizer such as Apache Ranger, or one of Confluent's authorizers.  In that case it's clear that KafkaServer.scala needs to set up whatever znodes it needs, and not rely on AclAuthorizer to do that.  It might be helpful to move the znode setup code into `KafkaZkClient` so that it doesn't need to be duplicated between AclAuthorizer and KafkaServer.scala.  But it's not a lot of code in any case, as far as I can see.
   
   A separate issue is that we need to start forwarding the ACL operations to the controller.  You did one half of that work here (adding "controller" to the message JSONs) but not the other (supporting these calls on the controller-side).  If it's easier, we could probably do this in a follow-up JIRA.  However, we do need to do it before the bridge release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-826903361


   @cmccabe Thanks for taking a look.  I agree in principle with your statement that `KafkaRaftServer` should know nothing about ZooKeeper or AclAuthorizer.  My reasoning for violating that idea was due to a specific issue: nothing will work with ZooKeeper when there is a chroot path in the `zk.connect` configuration value unless that chroot path exists.  Currently `KafkaServer` checks for and creates any non-existing chroot path upon startup, and the ZooKeeper initialization being done in `KafkaRaftServer` is the same thing -- the PR refactored that ability out into its own function so that it could be called from both `KafkaServer` and `KafkaRaftServer`.  It is conceivable that we could move this check into `AclAuthorizer`, but then the ZooKeeper-based broker won't run if there is no authorizer configured (and it might not work anyway due to race conditions -- who's to say that the authorizer will win the race and create the chroot path before anything else tries to connect?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino commented on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-830858518


   @cmccabe The code now auto-creates any ZooKeeper chroot in `KafkaZkClient.apply()`.  I added a unit test for it in KafkaZkClientTest, which passes, and it also gets exercised in a few system tests that set a ZK chroot path (`delegation_token_test.py` is one such file, and it passed locally for me with this change).  `KafkaRaftServer` no longer has any reference to ZooKeeper.
   
   Assuming the above change is acceptable, I think the remaining issue from your initial review is the issue of forwarding.  Take a look at my comment above and let me know what you think -- keep what I have, which short-circuits the forwarding, or forward to the controller and pass the ACL requests to the registered authorizer instance.  I suspect the latter is better, but let me know your opinion since you suggested this could be done in a follow-on PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino edited a comment on pull request #10550: MINOR: Add support for ZK Authorizer with KRaft

Posted by GitBox <gi...@apache.org>.
rondagostino edited a comment on pull request #10550:
URL: https://github.com/apache/kafka/pull/10550#issuecomment-827040926


   > we need to start forwarding the ACL operations to the controller
   
   We forward the Create/Remove operations to the controller, but this patch actually short-circuits that if we are using KRaft with the ZooKeeper-based `AclAuthorizer` via the changes to `RaftSupport.maybeForward()`.  The reason for short-circuiting it is because the KRaft controller doesn't have the code to create or remove ACLs (`handle{Create,Delete}Acls` in `KafkaApis`).  We could add it, of course, in which case the changes to the `maybeForward()` method would be unnecessary.  Perhaps it would be simpler to do that instead of delaying it to an additional PR -- is that what you were suggesting?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org