You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/06/07 21:42:50 UTC

[GitHub] [kafka] cmccabe opened a new pull request, #13826: KAFKA-15060: fix the ApiVersionManager interface

cmccabe opened a new pull request, #13826:
URL: https://github.com/apache/kafka/pull/13826

   This PR expands the scope of ApiVersionManager a bit to include returning the current MetadataVersion and features that are in effect. This is useful in general because that information needs to be returned in an ApiVersionsResponse. It also allows us to fix the ApiVersionManager interface so that all subclasses implement all methods of the interface. Having subclasses that don't implement some methods is dangerous because they could cause exceptions at runtime in unexpected scenarios.
   
   On the KRaft controller, we were previously performing a read operation in the QuorumController thread to get the current metadata version and features. With this PR, we now read a volatile variable maintained by a separate MetadataVersionContextPublisher object. This will improve performance and simplify the code. It should not change the guarantees we are providing; in both the old and new scenarios, we need to be robust against version skew scenarios during updates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe closed pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe closed pull request #13826: KAFKA-15060: fix the ApiVersionManager interface
URL: https://github.com/apache/kafka/pull/13826


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1224806791


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -447,16 +447,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,

Review Comment:
   Hmm. It cannot be the same as KafkaApis because this code returns a future, which the KafkApis code does not. And the comment about authentication seems good to keep. We can move the future to the end, however, since it's the same for every case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1226040921


##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -141,24 +136,22 @@ class DefaultApiVersionManager(
   val enabledApis = ApiKeys.apisForListener(listenerType).asScala
 
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
-    val supportedFeatures = features.supportedFeatures
+    val supportedFeatures = brokerFeatures.supportedFeatures
     val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
 
     ApiVersionsResponse.createApiVersionsResponse(
       throttleTimeMs,
-      metadataCache.metadataVersion().highestSupportedRecordVersion,
+      finalizedFeatures.metadataVersion().highestSupportedRecordVersion,
       supportedFeatures,
-      finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
-      finalizedFeatures.epoch,
+      finalizedFeatures.finalizedFeatures(),
+      finalizedFeatures.finalizedFeaturesEpoch(),
       controllerApiVersions.orNull,
       listenerType,
       enableUnstableLastVersion,
       zkMigrationEnabled
     )
   }
 
-  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = {
-    throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")
-  }
+  override def features: Features = metadataCache.features()

Review Comment:
   It seems this method is not used.



##########
core/src/main/scala/kafka/server/ControllerServer.scala:
##########
@@ -326,6 +329,9 @@ class ControllerServer(
       // register this instance for dynamic config changes to the KafkaConfig
       config.dynamicConfig.addReconfigurables(this)
 
+      // Set up the metadata version publisher.

Review Comment:
   nit: metadata version publisher -> feature publisher



##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -132,7 +127,7 @@ class SimpleApiVersionManager(
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
   forwardingManager: Option[ForwardingManager],
-  features: BrokerFeatures,
+  brokerFeatures: BrokerFeatures,

Review Comment:
   We should update the class docs to make it consistent with the field name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on PR #13826:
URL: https://github.com/apache/kafka/pull/13826#issuecomment-1597881564

   merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1227582802


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
     if (apiVersionRequest.hasUnsupportedRequestVersion) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else if (!apiVersionRequest.isValid) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs))

Review Comment:
   I think this is inevitable, the broker also get the feature data from MetadataCache,  which may also be stale data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1230241309


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
     if (apiVersionRequest.hasUnsupportedRequestVersion) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else if (!apiVersionRequest.isValid) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs))

Review Comment:
   I will change this not to start the listener until we've caught up to the local HWM. That will avoid this and some other gotchas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1224806791


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -447,16 +447,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,

Review Comment:
   It cannot be the same as KafkaApis because this code returns a future, which the KafkApis code does not. And the comment about authentication seems good to keep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dengziming commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "dengziming (via GitHub)" <gi...@apache.org>.
dengziming commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1223802381


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -584,4 +584,12 @@ class ZkMetadataCache(
   def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = {
     featuresAndEpoch
   }
+
+  override def versionContext(): MetadataVersionContext = {

Review Comment:
   The name is confusing, we include features and metadata version in the context, it's better to use FeaturesContext since metadata version is also a feature.



##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -447,16 +447,9 @@ class ControllerApis(val requestChannel: RequestChannel,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,

Review Comment:
   Since we are changing back to the original code, we can make it the same as KafkaApis, like this:
   ```
       def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
         val apiVersionRequest = request.body[ApiVersionsRequest]
         if (apiVersionRequest.hasUnsupportedRequestVersion) {
           apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
         } else if (!apiVersionRequest.isValid) {
           apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
         } else {
           apiVersionManager.apiVersionResponse(requestThrottleMs)
         }
       }
       requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1230241309


##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -441,23 +441,14 @@ class ControllerApis(val requestChannel: RequestChannel,
     if (apiVersionRequest.hasUnsupportedRequestVersion) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else if (!apiVersionRequest.isValid) {
       requestHelper.sendResponseMaybeThrottle(request,
         requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
-      CompletableFuture.completedFuture[Unit](())
     } else {
-      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
-      controller.finalizedFeatures(context).handle { (result, exception) =>
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
-          if (exception != null) {
-            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
-          } else {
-            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
-          }
-        })
-      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => apiVersionManager.apiVersionResponse(requestThrottleMs))

Review Comment:
   I will change this not to start the listener until we've caught up to the local HWM. That will avoid this and some other gotchas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1227246079


##########
metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.server.common.Features;
+import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_KRAFT_VERSION;
+
+
+public class FeaturesPublisher implements MetadataPublisher {
+    private volatile Features features = Features.fromKRaftVersion(MINIMUM_KRAFT_VERSION);
+
+    public Features features() {
+        return features;
+    }
+
+    @Override
+    public String name() {
+        return "FeaturesPublisher";
+    }
+
+    @Override
+    public void onMetadataUpdate(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LoaderManifest manifest
+    ) {
+        if (delta.featuresDelta() != null) {

Review Comment:
   The features delta should be present if this changed, whether by snapshot or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1227243709


##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -141,24 +136,22 @@ class DefaultApiVersionManager(
   val enabledApis = ApiKeys.apisForListener(listenerType).asScala
 
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
-    val supportedFeatures = features.supportedFeatures
+    val supportedFeatures = brokerFeatures.supportedFeatures
     val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
 
     ApiVersionsResponse.createApiVersionsResponse(
       throttleTimeMs,
-      metadataCache.metadataVersion().highestSupportedRecordVersion,
+      finalizedFeatures.metadataVersion().highestSupportedRecordVersion,
       supportedFeatures,
-      finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
-      finalizedFeatures.epoch,
+      finalizedFeatures.finalizedFeatures(),
+      finalizedFeatures.finalizedFeaturesEpoch(),
       controllerApiVersions.orNull,
       listenerType,
       enableUnstableLastVersion,
       zkMigrationEnabled
     )
   }
 
-  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = {
-    throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")
-  }
+  override def features: Features = metadataCache.features()

Review Comment:
   It will be used in a few follow-on changes. We need to start obtaining metadata version from here, and remove all references to the static config.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #13826: KAFKA-15060: fix the ApiVersionManager interface

Posted by "cmccabe (via GitHub)" <gi...@apache.org>.
cmccabe commented on code in PR #13826:
URL: https://github.com/apache/kafka/pull/13826#discussion_r1224811537


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -584,4 +584,12 @@ class ZkMetadataCache(
   def getFeatureOption: Option[FinalizedFeaturesAndEpoch] = {
     featuresAndEpoch
   }
+
+  override def versionContext(): MetadataVersionContext = {

Review Comment:
   That's a fair point. I will rename it to "Features"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org