You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/13 18:27:59 UTC

[GitHub] [kafka] mumrah opened a new pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

mumrah opened a new pull request #11677:
URL: https://github.com/apache/kafka/pull/11677


   This is part of the implementation of KIP-778.
   
   More description TBD


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r796836599



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -73,7 +74,7 @@ class BrokerMetadataListener(
   /**
    * The current metadata delta. Accessed only from the event queue thread.
    */
-  private var _delta = new MetadataDelta(_image)
+  private var _delta = new MetadataDelta(_image, metadataVersionProvider)

Review comment:
       The idea with this provider thing was to have an object accessible outside of the metadata update path that could be used to get the current metadata version. Replica fetchers was one consideration. These will need to get the current metadata.version when deciding which RPC to use (that is, until we switch them over to using ApiVersions). However, I suppose ReplicaManager can push the current metadata.version up to ReplicaFetcherManager as updates are processed. 
   
   Come to think of it, It's probably safer that we _don't_ have some out-of-band way to access the metadata.version and instead force components to use MetadataImage (or subscribe to metadata updates themselves). I'll back out this code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784439265



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##########
@@ -19,14 +19,15 @@ package kafka.server.metadata
 
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{UnifiedLog, LogManager}
+import kafka.log.{LogManager, UnifiedLog}

Review comment:
       do we need to reorder these? :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784438707



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -73,7 +74,7 @@ class BrokerMetadataListener(
   /**
    * The current metadata delta. Accessed only from the event queue thread.
    */
-  private var _delta = new MetadataDelta(_image)
+  private var _delta = new MetadataDelta(_image, metadataVersionProvider)

Review comment:
       The `MetadataImage` should have a metadata version associated with it. So there is no need for the delta to have a "metadata version provider" since the delta already knows what metadata version the image it is modifying is based on.
   
   if the version changes the interpretation of records we would certainly want to make sure that we were using the correct one for the image we're modifying, not something sourced from a global manager object somewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784439106



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -118,9 +119,14 @@ class BrokerMetadataListener(
       }
       _publisher.foreach(publish)
 
+      // If we detected a change in metadata.version, generate a local snapshot
+      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta =>

Review comment:
       It would be good to add an accessor for MetadataDelta like `MetadataDelta#metadataVersionChanged` (which I suppose could return an `Optional<MetadataVersion>` for simplicity)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#issuecomment-1020850019


   I have some questions concerning the `VersionRange` entity. In zk-related classes we use FinalizedVersionRange and SupportedVersionRange whereas in kraft-related code we use VersionRange, when I tried to covert UpdateFeatureTest to support kraft based on this PR, I found it's very fuzzy since the constructors of `FinalizedVersionRange` and `SupportedVersionRange` is complicated.
   So can we use a unified class and remove others to make it more simple, for example, use `VersionRange` to denote`SupportedVersionRange` and use a short to denote `FinalizedVersionRange`.
   I'm not sure whether this is viable,  @cmccabe @kowshik , what's your opinion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784426548



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -24,6 +24,24 @@
 public class FeatureUpdate {
     private final short maxVersionLevel;
     private final boolean allowDowngrade;
+    private final DowngradeType downgradeType;
+
+    public enum DowngradeType {
+        UNSET(-1), // Used for backwards compatibility with allowDowngrade

Review comment:
       We shouldn't need a fourth enum value, right? The KIP just says that passing `allowDowngrade = true` to the old constructor is equivalent to setting `DowngradeType.SAFE`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784440277



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataVersionManager.scala
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.metadata.{MetadataVersionProvider, MetadataVersions}
+
+import scala.collection.mutable
+
+
+/**
+ * This class holds the broker's view of the current metadata.version. Listeners can be registered with this class to
+ * get a synchronous callback when version has changed.
+ */
+class MetadataVersionManager extends MetadataVersionProvider with Logging {

Review comment:
       I don't think this should be a Scala class because non-scala code may need to use it.
   
   I also think we'll need some way to integrate this with IBP so that only one thing has to be checked. We don't want two "if statements" every time we're checking for IBP.
   
   Another question is whether we want some kind of callback framework where objects can register to know when IBP changes. It's more complicated but the performance could be higher than checking a volatile all the time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784437102



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/MetadataVersion.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+
+import java.util.Optional;
+
+public interface MetadataVersion extends Comparable<MetadataVersions> {

Review comment:
       Why do we need both `MetadataVersion` and `MetadataVersions` (plural)? It seems like there will only ever be one implementation of this, so just get rid of the interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784437638



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordVersionResolver.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.metadata.MetadataRecordType;
+
+@FunctionalInterface
+public interface MetadataRecordVersionResolver {

Review comment:
       I'm a bit confused by why we would need this..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#issuecomment-1012661747


   Do you want to split the `new VersionRange` -> `VersionRange.of` thing off into a separate PR? I like the change but it's kind of noisy (plus this would let us make some incremental progress)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784428620



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/UpdateFeaturesOptions.java
##########
@@ -26,4 +26,14 @@
  */
 @InterfaceStability.Evolving
 public class UpdateFeaturesOptions extends AbstractOptions<UpdateFeaturesOptions> {
+    private boolean dryRun = false;

Review comment:
       Looking at the other admin client things, it seems like we should probably name this `validateOnly` to be consistent with everything else. AlterClientQuotas, AlterConfigs, CreatePartitions, CreateTopics, all use `validateOnly` and not `dryRun`.
   
   (I guess this is a tiny KIP revision... but not too bad, right?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784434978



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -2107,7 +2107,7 @@ class ReplicaManager(val config: KafkaConfig,
    * @param delta           The delta to apply.
    * @param newImage        The new metadata image.
    */
-  def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
+  def applyDelta(delta: TopicsDelta, newImage: MetadataImage, metadataVersion: MetadataVersionDelta): Unit = {

Review comment:
       Do we need to add this here in this PR? We're not using it here...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r796836599



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -73,7 +74,7 @@ class BrokerMetadataListener(
   /**
    * The current metadata delta. Accessed only from the event queue thread.
    */
-  private var _delta = new MetadataDelta(_image)
+  private var _delta = new MetadataDelta(_image, metadataVersionProvider)

Review comment:
       The idea with this provider thing was to have an object accessible outside of the metadata update path that could be used to get the current metadata version. Replica fetchers was one consideration. These will need to get the current metadata.version when deciding which RPC to use (that is, until we switch them over to using ApiVersions). However, I suppose ReplicaManager can push the current metadata.version up to ReplicaFetcherManager as updates are processed. 
   
   Come to think of it, It's probably safer that we _don't_ have some out-of-band way to access the metadata.version and instead force components to use MetadataImage (or subscribe to metadata updates themselves). I'll back out this code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r796852603



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataVersionManager.scala
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.metadata.{MetadataVersionProvider, MetadataVersions}
+
+import scala.collection.mutable
+
+
+/**
+ * This class holds the broker's view of the current metadata.version. Listeners can be registered with this class to
+ * get a synchronous callback when version has changed.
+ */
+class MetadataVersionManager extends MetadataVersionProvider with Logging {

Review comment:
       ~If we used a callback thing, wouldn't each component then need to keep its own copy of the current metadata version? Also, would we need a volatile for this? We only have one thread doing writes, and it seems okay if reads are stale. If a component needs synchronization on metadata version updates, I think they would need to subscribe to updates and have their own lock (like ReplicaManager does).~
   
   Edit: Actually, if we don't provide the data in the callback, but just notify listeners that a metadata version has changed, they can then get the latest value from MetadataImage. We just need to run the callbacks after building the new MetadataImage.
   
   Related to this, I have some uncommitted code that introduces an interface:
   
   ```java
   public interface ApiVersionResolver {
       short fetchRequestVersion();
       short offsetForLeaderEpochVersion();
       boolean isTruncationSupported();
       boolean isOffsetForLeaderEpochSupport();
       boolean isAlterIsrSupported();
   }
   ```
   
   This is intended to abstract the things provided by IBP/metadata.version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784435597



##########
File path: raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
##########
@@ -240,6 +240,7 @@ private T readRecord(Readable input) {
             throw new IllegalArgumentException();
         }
 
+        // Here is where actual metadata records are read from the file input reader

Review comment:
       The "here is..." phrasing suggests some frustration, I hope I'm mistaken :)
   
   Perhaps we could put this comment on the same line and make it something like `// read the metadata record body`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784223928



##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -30,7 +30,11 @@
       {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
         "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
       {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
-        "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
-    ]}

Review comment:
       Can we actually remove AllowDowngrade field in version 1, i.e. setting `"versions": "0"`? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784427654



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4245,12 +4245,24 @@ public UpdateFeaturesResult updateFeatures(final Map<String, FeatureUpdate> feat
                         new UpdateFeaturesRequestData.FeatureUpdateKey();
                     requestItem.setFeature(feature);
                     requestItem.setMaxVersionLevel(update.maxVersionLevel());
-                    requestItem.setAllowDowngrade(update.allowDowngrade());
+                    switch (update.downgradeType()) {
+                        case UNSET:
+                            // v0 behavior
+                            requestItem.setAllowDowngrade(update.allowDowngrade());

Review comment:
       As described above, we can just condense this into `DowngradeType.SAFE`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784429730



##########
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##########
@@ -30,7 +30,11 @@
       {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
         "about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."},
       {"name": "AllowDowngrade", "type": "bool", "versions": "0+",
-        "about": "When set to true, the finalized feature version level is allowed to be downgraded/deleted. The downgrade request will fail if the new maximum version level is a value that's not lower than the existing maximum finalized version level."}
-    ]}

Review comment:
       Yes. Just remove it if you are not using it any more.
   
   The only awkward thing is that you need to think about how the defaults work. I suppose it should be something like:
   
   If AllowDowngrade = true (which would only be true for RPC version 0) then force DowngradeType = SAFE. Otherwise default DowngradeType to NONE.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784434196



##########
File path: core/src/main/scala/kafka/server/ControllerServer.scala
##########
@@ -160,11 +160,24 @@ class ControllerServer(
       alterConfigPolicy = Option(config.
         getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
 
+      if (config.interBrokerProtocolVersionString != Defaults.InterBrokerProtocolVersion) {
+        // TODO can we have a stronger check here?

Review comment:
       I think this check should be in KafkaConfig. It would also be stronger to look at `KafkaConfig#originals` to verify that that we weren't trying to set it to anything (i.e. ibp doesn't appear as a key in that map)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784226775



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
##########
@@ -251,7 +267,7 @@ public void deactivate() {
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record,
-            REGISTER_BROKER_RECORD.highestSupportedVersion()));
+            metadataVersionProvider.activeVersion().recordVersion(REGISTER_BROKER_RECORD)));

Review comment:
       Assuming we are fine with this usage pattern, we will need to make this change any time we serialize a record




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784225274



##########
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##########
@@ -94,11 +103,16 @@ class RawMetaProperties(val props: Properties = new Properties()) {
 }
 
 object MetaProperties {
+  def apply(clusterId: String, nodeId: Int): MetaProperties = {
+    MetaProperties(clusterId, nodeId, MetadataVersions.latest().version())

Review comment:
       If a user is using the format tool to initialize a cluster, we default to the "latest" metadata.version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r802068290



##########
File path: metadata/src/main/resources/common/metadata/FeatureLevelRecord.json
##########
@@ -22,9 +22,7 @@
   "fields": [
     { "name": "Name", "type": "string", "versions": "0+",
       "about": "The feature name." },
-    { "name": "MinFeatureLevel", "type": "int16", "versions": "0+",
-      "about": "The current finalized minimum feature level of this feature for the cluster." },
-    { "name": "MaxFeatureLevel", "type": "int16", "versions": "0+",
-      "about": "The current finalized maximum feature level of this feature for the cluster." }
+    { "name": "FeatureLevel", "type": "int16", "versions": "0+",

Review comment:
       I think we also need the "controller" listener to be added above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r796852603



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataVersionManager.scala
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.utils.Logging
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.metadata.{MetadataVersionProvider, MetadataVersions}
+
+import scala.collection.mutable
+
+
+/**
+ * This class holds the broker's view of the current metadata.version. Listeners can be registered with this class to
+ * get a synchronous callback when version has changed.
+ */
+class MetadataVersionManager extends MetadataVersionProvider with Logging {

Review comment:
       If we used a callback thing, wouldn't each component then need to keep its own copy of the current metadata version? Also, would we need a volatile for this? We only have one thread doing writes, and it seems okay if reads are stale. If a component needs synchronization on metadata version updates, I think they would need to subscribe to updates and have their own lock (like ReplicaManager does)
   
   Related to this, I have some uncommitted code that introduces an interface:
   
   ```java
   public interface ApiVersionResolver {
       short fetchRequestVersion();
       short offsetForLeaderEpochVersion();
       boolean isTruncationSupported();
       boolean isOffsetForLeaderEpochSupport();
       boolean isAlterIsrSupported();
   }
   ```
   
   This is intended to abstract the things provided by IBP/metadata.version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784224825



##########
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##########
@@ -94,11 +103,16 @@ class RawMetaProperties(val props: Properties = new Properties()) {
 }
 
 object MetaProperties {
+  def apply(clusterId: String, nodeId: Int): MetaProperties = {
+    MetaProperties(clusterId, nodeId, MetadataVersions.latest().version())
+  }
+
   def parse(properties: RawMetaProperties): MetaProperties = {
     properties.requireVersion(expectedVersion = 1)
     val clusterId = require(ClusterIdKey, properties.clusterId)
     val nodeId = require(NodeIdKey, properties.nodeId)
-    new MetaProperties(clusterId, nodeId)
+    val metadataVersion = properties.initialMetadataVersion.getOrElse(MetadataVersions.V1.version())

Review comment:
       If `meta.properties` does not have `initial.metadata.version` set (as would be the case with KRaft preview), we treat it as V1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#issuecomment-1020850019


   I have some questions concerning the `VersionRange` entity. In zk-related classes we use FinalizedVersionRange and SupportedVersionRange whereas in kraft-related code we use VersionRange, when I tried to covert UpdateFeatureTest to support kraft based on this PR, I found it's very fuzzy since the constructors of `FinalizedVersionRange` and `SupportedVersionRange` is complicated.
   So can we use a unified class and remove others to make it more simple, for example, use `VersionRange` to denote`SupportedVersionRange` and use a short to denote `FinalizedVersionRange`.
   I'm not sure whether this is viable,  @cmccabe @kowshik , what's your opinion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784426931



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -42,6 +60,23 @@ public FeatureUpdate(final short maxVersionLevel, final boolean allowDowngrade)
         }
         this.maxVersionLevel = maxVersionLevel;
         this.allowDowngrade = allowDowngrade;
+        this.downgradeType = DowngradeType.UNSET;
+    }
+
+    /**
+     * TODO

Review comment:
       This needs a description, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #11677: KAFKA-13410 Add KRaft metadata.version for KIP-778

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #11677:
URL: https://github.com/apache/kafka/pull/11677#discussion_r784426742



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -24,6 +24,24 @@
 public class FeatureUpdate {
     private final short maxVersionLevel;
     private final boolean allowDowngrade;
+    private final DowngradeType downgradeType;

Review comment:
       Can get rid of this (as per comment below...)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org