You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 18:26:26 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10194: KAFKA-12365; Disable transactional APIs on KIP-500 broker

hachikuji opened a new pull request #10194:
URL: https://github.com/apache/kafka/pull/10194


   Since we do not yet support producerId generation with the KIP-500 broker, this patch disables support of the transactional APIs. By removing the "broker" listener from the schema definitions, these APIs will not be advertised in `ApiVersions` and any requests will be rejected by the `SocketServer`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10194: KAFKA-12365; Disable transactional APIs on KIP-500 broker

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#issuecomment-786257147


   The approach LGTM to me.  Can you also disable the relevant system test(s) in this PR?
   
   Agree that we need to disable a few more things -- reassignment is one 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583294273



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def createResponseCallback(features: FeatureMapAndEpoch,
-                               requestThrottleMs: Int): ApiVersionsResponse = {
+    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
       val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion)
+      if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
-      else if (!apiVersionRequest.isValid)
+      } else if (!apiVersionRequest.isValid) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
-      else {
-        val data = new ApiVersionsResponseData().
-          setErrorCode(0.toShort).
-          setThrottleTimeMs(requestThrottleMs).
-          setFinalizedFeaturesEpoch(features.epoch())
-        supportedFeatures.foreach {
-          case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
-            setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
-        }
-        //        features.finalizedFeatures().asScala.foreach {
-        //          case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
-        //            setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
-        //        }
-        ApiKeys.values().foreach {
-          key =>
-            if (supportedApiKeys.contains(key)) {
-              data.apiKeys().add(new ApiVersion().
-                setApiKey(key.id).
-                setMaxVersion(key.latestVersion()).
-                setMinVersion(key.oldestVersion()))
-            }
-        }
-        new ApiVersionsResponse(data)
+      } else {
+        apiVersionManager.apiVersionResponse(requestThrottleMs)

Review comment:
       agreed. we will have to have some way for the controller to update those fields in ApiVersionsManager (possibly an atomic / volatile field or something).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583293910



##########
File path: clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 48,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],

Review comment:
       Hmm... looking at trunk, it looks like there are some changes from the KIP-500 branch that didn't make it over. `handleDescribeClientQuotasRequest` did work in the kip-500 branch.  Anyway, at this point, I think we should just disable it and put client quotas support on the TODO list for the next release (sigh)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583291501



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def createResponseCallback(features: FeatureMapAndEpoch,
-                               requestThrottleMs: Int): ApiVersionsResponse = {
+    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
       val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion)
+      if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
-      else if (!apiVersionRequest.isValid)
+      } else if (!apiVersionRequest.isValid) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
-      else {
-        val data = new ApiVersionsResponseData().
-          setErrorCode(0.toShort).
-          setThrottleTimeMs(requestThrottleMs).
-          setFinalizedFeaturesEpoch(features.epoch())
-        supportedFeatures.foreach {
-          case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
-            setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
-        }
-        //        features.finalizedFeatures().asScala.foreach {
-        //          case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
-        //            setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
-        //        }
-        ApiKeys.values().foreach {
-          key =>
-            if (supportedApiKeys.contains(key)) {
-              data.apiKeys().add(new ApiVersion().
-                setApiKey(key.id).
-                setMaxVersion(key.latestVersion()).
-                setMinVersion(key.oldestVersion()))
-            }
-        }
-        new ApiVersionsResponse(data)
+      } else {
+        apiVersionManager.apiVersionResponse(requestThrottleMs)

Review comment:
       Right. I think the change makes sense in any case since the feature logic will end up getting handled by `ApiVersionManager` (as we do with the broker).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10194:
URL: https://github.com/apache/kafka/pull/10194


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583284033



##########
File path: clients/src/main/resources/common/message/DeleteTopicsRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 20,
   "type": "request",
-  "listeners": ["zkBroker", "broker", "controller"],
+  "listeners": ["zkBroker"],

Review comment:
       we are fixing this one, so don't disable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583283950



##########
File path: clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 48,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],

Review comment:
       describeClientQuotas is implemented for kip-500 already (on the broker)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583287372



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def createResponseCallback(features: FeatureMapAndEpoch,
-                               requestThrottleMs: Int): ApiVersionsResponse = {
+    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
       val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion)
+      if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
-      else if (!apiVersionRequest.isValid)
+      } else if (!apiVersionRequest.isValid) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
-      else {
-        val data = new ApiVersionsResponseData().
-          setErrorCode(0.toShort).
-          setThrottleTimeMs(requestThrottleMs).
-          setFinalizedFeaturesEpoch(features.epoch())
-        supportedFeatures.foreach {
-          case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
-            setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
-        }
-        //        features.finalizedFeatures().asScala.foreach {
-        //          case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
-        //            setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
-        //        }
-        ApiKeys.values().foreach {
-          key =>
-            if (supportedApiKeys.contains(key)) {
-              data.apiKeys().add(new ApiVersion().
-                setApiKey(key.id).
-                setMaxVersion(key.latestVersion()).
-                setMinVersion(key.oldestVersion()))
-            }
-        }
-        new ApiVersionsResponse(data)
+      } else {
+        apiVersionManager.apiVersionResponse(requestThrottleMs)

Review comment:
       I think we do need to fill in the feature fields eventually. But for 2.8, this is OK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10194: KAFKA-12365; Disable transactional APIs on KIP-500 broker

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#issuecomment-784430912


   Note I'm doing a pass over the other APIs that should be disabled for now. For example, the reassignment APIs are not yet supported. I will probably just generalize this patch to cover all of them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583290497



##########
File path: clients/src/main/resources/common/message/DescribeClientQuotasRequest.json
##########
@@ -16,7 +16,7 @@
 {
   "apiKey": 48,
   "type": "request",
-  "listeners": ["zkBroker", "broker"],
+  "listeners": ["zkBroker"],

Review comment:
       I was going by this: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L3026.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10194: KAFKA-12365; Disable APIs not supported by KIP-500 broker/controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10194:
URL: https://github.com/apache/kafka/pull/10194#discussion_r583294273



##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -251,45 +210,17 @@ class ControllerApis(val requestChannel: RequestChannel,
     // If this is considered to leak information about the broker version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def createResponseCallback(features: FeatureMapAndEpoch,
-                               requestThrottleMs: Int): ApiVersionsResponse = {
+    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
       val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion)
+      if (apiVersionRequest.hasUnsupportedRequestVersion) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
-      else if (!apiVersionRequest.isValid)
+      } else if (!apiVersionRequest.isValid) {
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
-      else {
-        val data = new ApiVersionsResponseData().
-          setErrorCode(0.toShort).
-          setThrottleTimeMs(requestThrottleMs).
-          setFinalizedFeaturesEpoch(features.epoch())
-        supportedFeatures.foreach {
-          case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
-            setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
-        }
-        //        features.finalizedFeatures().asScala.foreach {
-        //          case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
-        //            setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
-        //        }
-        ApiKeys.values().foreach {
-          key =>
-            if (supportedApiKeys.contains(key)) {
-              data.apiKeys().add(new ApiVersion().
-                setApiKey(key.id).
-                setMaxVersion(key.latestVersion()).
-                setMinVersion(key.oldestVersion()))
-            }
-        }
-        new ApiVersionsResponse(data)
+      } else {
+        apiVersionManager.apiVersionResponse(requestThrottleMs)

Review comment:
       agreed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org