You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/25 20:30:58 UTC

[GitHub] [kafka] mumrah opened a new pull request, #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

mumrah opened a new pull request, #12214:
URL: https://github.com/apache/kafka/pull/12214

   This patch rearranges some code to remove the FinalizedFeatureCache and replace it with MetadataCache. Since features are included in the MetadataImage in KRaft mode (where feature flags are actually used), it makes sense to expose them through MetadataCache.
   
   One catch was that the ZK controller needed access to the feature flags cache and I didn't want to pass MetadataCache into KafkaController. For this case I pulled out an interface of the two accessors the controller used.
   
   This patch also adds a MetadataVersion getter on the MetadataCache interface.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882909889


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2036,7 +2037,7 @@ class KafkaController(val config: KafkaConfig,
   private def processFeatureUpdatesWithActiveController(request: UpdateFeaturesRequest,
                                                         callback: UpdateFeaturesCallback): Unit = {
     val updates = request.featureUpdates
-    val existingFeatures = featureCache.get
+    val existingFeatures = featureCache.getFeatureOption

Review Comment:
   I guess let's just file a follow-up JIRA and move on, given that we won't be using this any time soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882890450


##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -70,35 +67,24 @@ class SimpleApiVersionManager(
 
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
-  interBrokerProtocolVersion: MetadataVersion,
   forwardingManager: Option[ForwardingManager],
   features: BrokerFeatures,
-  featureCache: FinalizedFeatureCache
+  metadataCache: MetadataCache
 ) extends ApiVersionManager {
 
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
     val supportedFeatures = features.supportedFeatures
-    val finalizedFeaturesOpt = featureCache.get
+    val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
 
-    finalizedFeaturesOpt match {
-      case Some(finalizedFeatures) => ApiVersionsResponse.createApiVersionsResponse(
+    ApiVersionsResponse.createApiVersionsResponse(
         throttleTimeMs,
-        interBrokerProtocolVersion.highestSupportedRecordVersion,
+        metadataCache.metadataVersion().highestSupportedRecordVersion,
         supportedFeatures,
         finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
         finalizedFeatures.epoch,
         controllerApiVersions.orNull,
         listenerType)
-      case None => ApiVersionsResponse.createApiVersionsResponse(
-        throttleTimeMs,
-        interBrokerProtocolVersion.highestSupportedRecordVersion,
-        supportedFeatures,
-        Collections.emptyMap(),
-        ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,

Review Comment:
   excellent. nice to not have this weird special case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882092143


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -58,6 +74,11 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
   this.logIdent = s"[MetadataCache brokerId=$brokerId] "
   private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
 
+  // Features are updated via ZK notification (see FinalizedFeatureChangeListener)
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty
+  private val featureLock = new ReentrantLock()
+  private val featureCond = featureLock.newCondition()

Review Comment:
   In FinalizedFeatureCache the same volatile shown here was used, but it was using `synchronized` blocks and wait/notify. I opted for an explicit lock and condition to avoid locking on the whole class



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -58,6 +74,11 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
   this.logIdent = s"[MetadataCache brokerId=$brokerId] "
   private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
 
+  // Features are updated via ZK notification (see FinalizedFeatureChangeListener)
+  @volatile private var featuresAndEpoch: Option[FinalizedFeaturesAndEpoch] = Option.empty
+  private val featureLock = new ReentrantLock()
+  private val featureCond = featureLock.newCondition()

Review Comment:
   In FinalizedFeatureCache the same volatile shown here was used, but it was using `synchronized` blocks and wait/notify. I opted for an explicit lock and condition to avoid locking on the whole object



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882906680


##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2036,7 +2037,7 @@ class KafkaController(val config: KafkaConfig,
   private def processFeatureUpdatesWithActiveController(request: UpdateFeaturesRequest,
                                                         callback: UpdateFeaturesCallback): Unit = {
     val updates = request.featureUpdates
-    val existingFeatures = featureCache.get
+    val existingFeatures = featureCache.getFeatureOption

Review Comment:
   I realize this is existing code, but this seems incorrect. If someone changed the features in ZK recently, our cache is probably out of date. So we might lose an update by writing back the cache + whatever update is coming in.
   
   I guess he is trying to avoid this with all the calls to `waitUntilFeatureEpochOrThrow` (bizarre pattern in my opinion...) but that doesn't really work because changes to the cache are async with regard to the ZK controller thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882951256


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -430,4 +451,103 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
                               aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) {
     val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) }
   }
+
+  override def metadataVersion(): MetadataVersion = metadataVersion
+
+  override def features(): FinalizedFeaturesAndEpoch = {
+    featuresAndEpoch match {
+      case Some(features) => features
+      case None => FinalizedFeaturesAndEpoch(Map.empty, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH)
+    }
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch.
+   * Expects that the latestEpoch should be always greater than the existing epoch (when the
+   * existing epoch is defined).
+   *
+   * @param latestFeatures   the latest finalized features to be set in the cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update operation fails
+   *                         due to invalid parameters or incompatibilities with the broker's
+   *                         supported features. In such a case, the existing cache contents are
+   *                         not modified.
+   */
+  def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
+    val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
+    val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("<empty>")
+    if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) {
+      val errorMsg = s"FinalizedFeatureCache update failed due to invalid epoch in new $latest." +

Review Comment:
   Yeah, that's fair. We can follow up on this later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882899482


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -430,4 +451,103 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
                               aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) {
     val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) }
   }
+
+  override def metadataVersion(): MetadataVersion = metadataVersion
+
+  override def features(): FinalizedFeaturesAndEpoch = {
+    featuresAndEpoch match {
+      case Some(features) => features
+      case None => FinalizedFeaturesAndEpoch(Map.empty, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH)
+    }
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch.
+   * Expects that the latestEpoch should be always greater than the existing epoch (when the
+   * existing epoch is defined).
+   *
+   * @param latestFeatures   the latest finalized features to be set in the cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update operation fails
+   *                         due to invalid parameters or incompatibilities with the broker's
+   *                         supported features. In such a case, the existing cache contents are
+   *                         not modified.
+   */
+  def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
+    val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
+    val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("<empty>")
+    if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) {
+      val errorMsg = s"FinalizedFeatureCache update failed due to invalid epoch in new $latest." +

Review Comment:
   Do we really need to have this here? The epoch thing just seems like another example of overdesign
   
   It's basically validating that ZK is working as expected...
   
   I could see this getting annoying if we wanted to manually go in and fix the features znode or something. I don't know.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah merged pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
mumrah merged PR #12214:
URL: https://github.com/apache/kafka/pull/12214


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah closed pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
mumrah closed pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache
URL: https://github.com/apache/kafka/pull/12214


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cmccabe commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
cmccabe commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882893885


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -149,9 +147,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
         debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
       }
 
-      // Apply feature deltas.
       Option(delta.featuresDelta()).foreach { featuresDelta =>
-        featureCache.update(featuresDelta, highestOffsetAndEpoch.offset)
         featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
           info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")

Review Comment:
   It seems like we should be logging changes to the other feature flags as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882090750


##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -70,35 +67,24 @@ class SimpleApiVersionManager(
 
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
-  interBrokerProtocolVersion: MetadataVersion,
   forwardingManager: Option[ForwardingManager],
   features: BrokerFeatures,
-  featureCache: FinalizedFeatureCache
+  metadataCache: MetadataCache
 ) extends ApiVersionManager {
 
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
     val supportedFeatures = features.supportedFeatures
-    val finalizedFeaturesOpt = featureCache.get
+    val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
 
-    finalizedFeaturesOpt match {
-      case Some(finalizedFeatures) => ApiVersionsResponse.createApiVersionsResponse(
+    ApiVersionsResponse.createApiVersionsResponse(
         throttleTimeMs,
-        interBrokerProtocolVersion.highestSupportedRecordVersion,
+        metadataCache.metadataVersion().highestSupportedRecordVersion,
         supportedFeatures,
         finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
         finalizedFeatures.epoch,
         controllerApiVersions.orNull,
         listenerType)
-      case None => ApiVersionsResponse.createApiVersionsResponse(
-        throttleTimeMs,
-        interBrokerProtocolVersion.highestSupportedRecordVersion,
-        supportedFeatures,
-        Collections.emptyMap(),
-        ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,

Review Comment:
   This empty case is handled by the `features()` method on MetadataCache



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mumrah commented on a diff in pull request #12214: MINOR: Consolidate FinalizedFeatureCache into MetadataCache

Posted by GitBox <gi...@apache.org>.
mumrah commented on code in PR #12214:
URL: https://github.com/apache/kafka/pull/12214#discussion_r882906015


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -430,4 +451,103 @@ class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {
                               aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) {
     val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) }
   }
+
+  override def metadataVersion(): MetadataVersion = metadataVersion
+
+  override def features(): FinalizedFeaturesAndEpoch = {
+    featuresAndEpoch match {
+      case Some(features) => features
+      case None => FinalizedFeaturesAndEpoch(Map.empty, ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH)
+    }
+  }
+
+  /**
+   * Updates the cache to the latestFeatures, and updates the existing epoch to latestEpoch.
+   * Expects that the latestEpoch should be always greater than the existing epoch (when the
+   * existing epoch is defined).
+   *
+   * @param latestFeatures   the latest finalized features to be set in the cache
+   * @param latestEpoch      the latest epoch value to be set in the cache
+   *
+   * @throws                 FeatureCacheUpdateException if the cache update operation fails
+   *                         due to invalid parameters or incompatibilities with the broker's
+   *                         supported features. In such a case, the existing cache contents are
+   *                         not modified.
+   */
+  def updateFeaturesOrThrow(latestFeatures: Map[String, Short], latestEpoch: Long): Unit = {
+    val latest = FinalizedFeaturesAndEpoch(latestFeatures, latestEpoch)
+    val existing = featuresAndEpoch.map(item => item.toString()).getOrElse("<empty>")
+    if (featuresAndEpoch.isDefined && featuresAndEpoch.get.epoch > latest.epoch) {
+      val errorMsg = s"FinalizedFeatureCache update failed due to invalid epoch in new $latest." +

Review Comment:
   For this PR I'm trying to minimize functional changes to the ZK controller. If we end up actually using feature flags in ZK mode we can re-evaluate some of these things, but for now I'm trying to reduce impact



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org