You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/05 04:32:03 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

hachikuji opened a new pull request #10066:
URL: https://github.com/apache/kafka/pull/10066


   With KIP-500, we have more complex requirements on API accessibility. Previously all APIs were accessible on every listener exposed by the broker, but now that is no longer true. For example:
   
   - the controller exposes some APIs which are not accessible on the broker listener (e.g. quorum/registration/heartbeat APIs)
   - most of the client APIs are not exposed on the controller (e.g. consumer group apis)
   - there are some APIs which are not implemented by the KIP-500 broker (e.g. `LeaderAndIsr` and `UpdateMetadata`)
   - there are some APIs which are only implemented by the KIP-500 broker (e.g. `DecommissionBroker` and `DescribeQuorum`)
   
   All of this means that we need more sophistication in how we expose APIs and keep them consistent with the `ApiVersions` API. Up to now, we have been working around this using the `controllerOnly` flag inside `ApiKeys`, but this is not rich enough to support all of the cases listed above.
   
   In this patch, we address this by problem by introducing a new `scope` field to the request schema definitions. This field is an array of strings which indicate the scope in which the API should be exposed. We currently support the following scopes: 
   
   - `zkBroker`: old broker
   - `broker`: kip-500 broker
   - `controller`: kip-500 controller
   - `raft`: raft test server
   
   For example, the `DecommissionBroker` API has the following scope tag:
   ```json
     "scope": ["broker", "controller"]
   ```
   This indicates that the API is only on the KIP-500 broker and controller (both are needed because the request will be sent by clients and forwarded to the controller).
   
   The patch changes the generator so that the scope definitions are added to `ApiMessageType` and exposed through convenient helpers. At the same time, we have removed the `controllerOnly` flag from `ApiKeys` since now we can identify all controller APIs through the "controller" scope tag.
   
   The rest of the patch is dedicated to ensuring that the API scope is properly set. We have created a new `ApiVersionManager` which encapsulates the creation of the `ApiVersionsResponse` based on the scope. Additionally, `SocketServer` is modified to ensure the scope of received requests before forwarding them to the request handler.
   
   We have also fixed a bug in the handling of the `ApiVersionsResponse` prior to authentication. Previously a static response was sent, which means that changes to features would not get reflected. This also meant that the logic to ensure that only the intersection of version ranges supported by the controller would get exposed did not work. I think this is important because some clients rely on the initial pre-authenticated `ApiVersions` response rather than doing a second round after authentication as the Java client does.
   
   One final cleanup note: I have removed the expectation that envelope requests are only allowed on "privileged" listeners. This made sense initially because we expected to use forwarding before the KIP-500 controller was available. That is not the case anymore and we expect the `Envelope` API to only be exposed on the controller listener. I have nevertheless preserved the existing workarounds to allow this API to verify forwarding behavior in integration testing.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#issuecomment-781496083


   @cmccabe @ijuma This is ready for another look. I have changed `scope` to `listeners` and I removed the "raft" listener type.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578064446



##########
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER,
+
+    @JsonProperty("raft")
+    RAFT;

Review comment:
       Ok, I will get rid of it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r571501194



##########
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER,
+
+    @JsonProperty("raft")
+    RAFT;

Review comment:
       I had something like that originally, but I found it simpler to have a 1-1 mapping from listener to scope. All "CLIENT" APIs have to be present on both the zk and raft brokers, but the "CLIENT" scope alone doesn't give us a way to distinguish between the two. We could have "ZK_CLIENT" and "RAFT_CLIENT" as an alternative. Then I guess we would also still need "ZK_BROKER" and "RAFT_BROKER." In any case, then each listener might have multiple scopes, which is a little more complex.
   
   Perhaps this would be clearer if we had a name tying this back the listener more directly. Maybe we should get rid of the "scope" term and just call it "listeners"?
   
   ```json
   "listeners": ["broker", "controller"]
   ```
   
   By the way, you probably saw it already, but the "raft" scope was added here just for convenience in `TestRaftServer`. I could use the "controller" scope instead and remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r571500369



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       Hmm.. Do you have any ideas? I guess I would say that the dependency is there regardless of how we choose to hide it since we have to go through `ChannelBuilders` to build the authenticator, and the authenticator does depend on the request APIs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578717018



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -246,10 +250,15 @@ public void visit(Type field) {
         return hasBuffer.get();
     }
 
-    public static List<ApiKeys> brokerApis() {
-        return Arrays.stream(values())
-            .filter(api -> !api.isControllerOnlyApi)
+    public static EnumSet<ApiKeys> zkBrokerApis() {
+        return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
+    public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {

Review comment:
       Makes sense. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578711547



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       Yeah, the API versions response is pretty special in the protocol. Fortunately or unfortunately, it's not treated as a generic response.  I tried to think of some ways to avoid this dependency but they all ended up being kind of like obfuscation.
   
   > Could we make it work by changing the supplier to something more generic?
   
   Maybe I'm misinterpreting the suggestion, but if we chose to make this a `Supplier<AbstractResponse>`, that's still in the requests package....




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r572443431



##########
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER,
+
+    @JsonProperty("raft")
+    RAFT;

Review comment:
       @ijuma : unfortunately we don't have a clear separation between clients and brokers at the protocol level.  As you know, if a random node connects to the broker and asks for ApiVersions, the broker doesn't (yet) know if the other node is another broker or if it's a client.  So it wouldn't help to label APIs as CLIENT vs. BROKER, since the broker has to send back both anyway.
   
   On the other hand, we can definitely put APIs into "zk broker", "kip-500 zk", and "kip500 controller" and use those buckets to figure out what to send in the ApiVersionsRequest.
   
   @hachikuji : The "raft" scope seems a little weird since all the other scopes map to listeners on servers.  The TestRaftServer is just an internal junit thing, right?  So it's probably fine if it just uses the Controller scope and returns `UnsupportedVersionException` for things it doesn't support.  I don't think we should have a separate scope for this unless it somehow impacts the real servers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe closed pull request #10066:
URL: https://github.com/apache/kafka/pull/10066


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578711547



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       > I guess I would say that the dependency is there regardless of how we choose to hide it since we have to go through ChannelBuilders to build the authenticator, and the authenticator does depend on the request APIs.
   
   Yeah, the API versions response is pretty special in the protocol. Fortunately or unfortunately, it's not treated as a generic response.  I tried to think of some ways to avoid this dependency but they all ended up being kind of like obfuscation.
   
   > Could we make it work by changing the supplier to something more generic?
   
   Maybe I'm misinterpreting the suggestion, but if we choose to make this a `Supplier<AbstractResponse>`, that's still in the requests package....




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578701957



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -246,10 +250,15 @@ public void visit(Type field) {
         return hasBuffer.get();
     }
 
-    public static List<ApiKeys> brokerApis() {
-        return Arrays.stream(values())
-            .filter(api -> !api.isControllerOnlyApi)
+    public static EnumSet<ApiKeys> zkBrokerApis() {
+        return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
+    public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {

Review comment:
       given that these `EnumSets` are used so much throughout the code, we should just calculate them once and reuse them, rather than recalculating each time.  There are only 3 listener types so it could just be in a static map or something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578742130



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       I meant that you could introduce an interface that the response implements. That interface would live in the network package or in the common package.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578716719



##########
File path: core/src/main/scala/kafka/server/ApiVersionManager.scala
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+    listenerType: ListenerType,
+    config: KafkaConfig,
+    forwardingManager: Option[ForwardingManager],

Review comment:
       The `ForwardingManager` is used by the broker. The reason we depend on it here is because of the need to intersect the controller api versions.
   
   I agree with you about the envelope handling. I was planning to submit a follow-up to remove the handling logic from `KafkaApis`. As you said, only the controller needs to be able to handle the Envelope API. Doing so here, however, would add a significant diff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578716719



##########
File path: core/src/main/scala/kafka/server/ApiVersionManager.scala
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+    listenerType: ListenerType,
+    config: KafkaConfig,
+    forwardingManager: Option[ForwardingManager],

Review comment:
       The `ForwardingManager` is used by the broker. The reason we depend on it here is because of the need to intersect the controller api versions.
   
   I agree with you about the envelope handling. I was planning to submit a follow-up to remove this logic from `KafkaApis`. As you said, only the controller needs to be able to handle the Envelope API. Doing so here, however, would add a significant diff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578701957



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
##########
@@ -246,10 +250,15 @@ public void visit(Type field) {
         return hasBuffer.get();
     }
 
-    public static List<ApiKeys> brokerApis() {
-        return Arrays.stream(values())
-            .filter(api -> !api.isControllerOnlyApi)
+    public static EnumSet<ApiKeys> zkBrokerApis() {
+        return apisForListener(ApiMessageType.ListenerType.ZK_BROKER);
+    }
+
+    public static EnumSet<ApiKeys> apisForListener(ApiMessageType.ListenerType listener) {

Review comment:
       given that these `EnumSets` are used so much throughout the code, we should just calculate them once and reuse them.  There are only 3 listener types so it could just be in a static map or something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r571501194



##########
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER,
+
+    @JsonProperty("raft")
+    RAFT;

Review comment:
       I had something like that originally, but I found it simpler to have a 1-1 mapping from listener to scope. All "CLIENT" APIs have to be present on both the zk and raft brokers, but it alone doesn't give us a way to distinguish between the two. We could have "ZK_CLIENT" and "RAFT_CLIENT" as an alternative. Then I guess we would also still need "ZK_BROKER" and "RAFT_BROKER." In any case, then each listener might have multiple scopes, which is a little more complex.
   
   Perhaps this would be clearer if we had a name tying this back the listener more directly. Maybe we should get rid of the "scope" term and just call it "listeners"?
   
   ```json
   "listeners": ["broker", "controller"]
   ```
   
   By the way, you probably saw it already, but the "raft" scope was added here just for convenience in `TestRaftServer`. I could use the "controller" scope instead and remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578711547



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       Yeah, the API versions response is pretty special in the protocol. Fortunately or unfortunately, it's not treated as a generic response.  I tried to think of some ways to avoid this dependency but they all ended up being kind of like obfuscation.
   
   Even if we chose to make this a `Supplier<AbstractResponse>`, that's still in the requests package....




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#issuecomment-781727318


   pushed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578705716



##########
File path: core/src/main/scala/kafka/server/ApiVersionManager.scala
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+    listenerType: ListenerType,
+    config: KafkaConfig,
+    forwardingManager: Option[ForwardingManager],

Review comment:
       I would really prefer not to have all this `ForwardingManager` stuff in here.  The `ControllerServer` has been committed to trunk and does handle `ENVELOPE_REQUEST` so I think this is out of date.
   
   It should be as simple as controllers handle envelope requests, brokers don't, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578825895



##########
File path: core/src/main/scala/kafka/server/ApiVersionManager.scala
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+    listenerType: ListenerType,
+    config: KafkaConfig,
+    forwardingManager: Option[ForwardingManager],

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r571511300



##########
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER,
+
+    @JsonProperty("raft")
+    RAFT;

Review comment:
       These are good points. I think if we keep the approach suggested here, `listeners` may be better. However, I am still not sure if it would not be better to have `ZK_CLIENT` and `RAFT_CLIENT` (or something along those lines). Then, as we drop support for ZK, it would work pretty nicely. Generally, I think the json files would ideally be our protocol definition versus something where we capture implementation details.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578742130



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       I meant that you could introduce an interface that the response implements. That interface would live in the network package.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578716719



##########
File path: core/src/main/scala/kafka/server/ApiVersionManager.scala
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.api.ApiVersion
+import kafka.network
+import kafka.network.RequestChannel
+import org.apache.kafka.common.message.ApiMessageType.ListenerType
+import org.apache.kafka.common.message.ApiVersionsResponseData
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+
+import scala.jdk.CollectionConverters._
+
+trait ApiVersionManager {
+  def listenerType: ListenerType
+  def enabledApis: collection.Set[ApiKeys]
+  def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
+}
+
+object ApiVersionManager {
+  def apply(
+    listenerType: ListenerType,
+    config: KafkaConfig,
+    forwardingManager: Option[ForwardingManager],

Review comment:
       The `ForwardingManager` is used by the broker. The reason we depend on it here is because of the need to intersect the controller api versions.
   
   I agree with you about the envelope handling. I was planning to submit a follow-up to review the handling logic from `KafkaApis`. As you said, only the controller needs to be able to handle the Envelope API. Doing so here, however, would add a significant diff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r578826004



##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       Hmm.  Let's revisit this after 2.8.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10066: KAFKA-12278; Ensure exposed api versions are consistent within listener scopes

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10066:
URL: https://github.com/apache/kafka/pull/10066#discussion_r571491307



##########
File path: generator/src/main/java/org/apache/kafka/message/RequestApiScope.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum RequestApiScope {
+    @JsonProperty("zkBroker")
+    ZK_BROKER,
+
+    @JsonProperty("broker")
+    BROKER,
+
+    @JsonProperty("controller")
+    CONTROLLER,
+
+    @JsonProperty("raft")
+    RAFT;

Review comment:
       If we're talking about scopes, wouldn't it be more intuitive if one of them was `CLIENT`? Since these definitions go into the protocol definition files that are used by other clients, this could be used to differentiate between protocol apis that clients need to support versus protocol apis that are used for inter broker and broker -> controller communication.

##########
File path: checkstyle/import-control.xml
##########
@@ -99,6 +100,7 @@
       <allow pkg="org.apache.kafka.common.config" />
       <allow pkg="org.apache.kafka.common.metrics" />
       <allow pkg="org.apache.kafka.common.security" />
+      <allow class="org.apache.kafka.common.requests.ApiVersionsResponse" />

Review comment:
       The network package is not meant to access requests, right? Could we make it work by changing the supplier to something more generic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org