You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/09 23:50:52 UTC

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement KIP-584 write path

kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r452398464



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,26 @@
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
+    private boolean shouldUseControllerAsDestination = false;
+
+    public DescribeFeaturesOptions shouldUseControllerAsDestination(boolean shouldUse) {
+        shouldUseControllerAsDestination = shouldUse;
+        return this;
+    }
+
+    public boolean shouldUseControllerAsDestination() {

Review comment:
       remove word 'should'

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,10 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+    UpdateFinalizedFeaturesResult updateFinalizedFeatures(Set<FeatureUpdate> featureUpdates, UpdateFinalizedFeaturesOptions options);

Review comment:
       add doc

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##########
@@ -0,0 +1,26 @@
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
+    private boolean shouldUseControllerAsDestination = false;
+
+    public DescribeFeaturesOptions shouldUseControllerAsDestination(boolean shouldUse) {

Review comment:
       add doc

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) {
         return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) {

Review comment:
       1. add test code in `KafkaAdminClientTest`
   2. final variable names

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) {
         return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call callViaLeastLoadedNode = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else {
+                    future.completeExceptionally(
+                        Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);
+            }
+        };
+
+        Call call = callViaLeastLoadedNode;
+        if (options.shouldUseControllerAsDestination()) {
+            call = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+
+                @Override
+                ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                    return (ApiVersionsRequest.Builder) callViaLeastLoadedNode.createRequest(timeoutMs);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse response) {
+                    callViaLeastLoadedNode.handleResponse(response);
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    callViaLeastLoadedNode.handleFailure(throwable);
+                }
+            };
+        }
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFinalizedFeaturesResult updateFinalizedFeatures(
+        Set<FeatureUpdate> featureUpdates, UpdateFinalizedFeaturesOptions options) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call call = new Call("updateFinalizedFeatures", calcDeadlineMs(now, options.timeoutMs()),

Review comment:
       1 line gap before `cal`

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -319,7 +322,10 @@
     GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
         GroupSubscribedToTopicException::new),
     INVALID_RECORD(87, "This record has failed the validation on broker and hence will be rejected.", InvalidRecordException::new),
-    UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new);
+    UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new),
+    INCOMPATIBLE_FEATURES(89, "Could not apply finalized feature updates due to incompatible features.", IncompatibleFeaturesException::new),

Review comment:
       Eliminate and use INVALID_REQUEST

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +277,24 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def setupFeatureZNode(newNode: FeatureZNode): Unit = {

Review comment:
       call the variable as `nodeContents` ?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFinalizedFeaturesResponse.java
##########
@@ -0,0 +1,54 @@
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class UpdateFinalizedFeaturesResponse extends AbstractResponse {
+
+    public final UpdateFinalizedFeaturesResponseData data;
+
+    public UpdateFinalizedFeaturesResponse(UpdateFinalizedFeaturesResponseData data) {
+        this.data = data;
+    }
+
+    public UpdateFinalizedFeaturesResponse(Struct struct) {
+        short latestVersion = (short) (UpdateFinalizedFeaturesResponseData.SCHEMAS.length - 1);
+        this.data = new UpdateFinalizedFeaturesResponseData(struct, latestVersion);
+    }
+
+    public UpdateFinalizedFeaturesResponse(Struct struct, short version) {
+        this.data = new UpdateFinalizedFeaturesResponseData(struct, version);
+    }
+
+    public Errors error() {
+        return Errors.forCode(data.errorCode());
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return errorCounts(Errors.forCode(data.errorCode()));
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        return data.toStruct(version);
+    }
+
+    @Override
+    public String toString() {
+        return data.toString();
+    }
+
+    public UpdateFinalizedFeaturesResponseData data() {
+        return data;
+    }
+
+    public static UpdateFinalizedFeaturesResponse parse(ByteBuffer buffer, short version) {
+        return new UpdateFinalizedFeaturesResponse(ApiKeys.SYNC_GROUP.parseResponse(version, buffer), version);

Review comment:
       Fix ApiKeys

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1693,33 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def processUpdateFinalizedFeatures(newFeatures: Features[FinalizedVersionRange],
+                                             callback: UpdateFinalizedFeaturesCallback): Unit = {
+    if (isActive) {
+      val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => {
+        BrokerFeatures.hasIncompatibleFeatures(broker.features, newFeatures)
+      })
+      if (numIncompatibleBrokers > 0) {
+        callback(
+          Errors.INCOMPATIBLE_FEATURES,
+          Some(
+            s"Could not apply finalized feature updates because $numIncompatibleBrokers brokers" +

Review comment:
       Perhaps add info about newFeatures and incompatibleBrokers.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##########
@@ -0,0 +1,70 @@
+package org.apache.kafka.clients.admin;
+
+import static java.util.stream.Collectors.joining;
+
+import java.util.Objects;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
+public class FeatureMetadata {

Review comment:
       add doc to entire class

##########
File path: core/src/test/resources/log4j.properties
##########
@@ -12,14 +12,80 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

Review comment:
       revert the file eventually

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -0,0 +1,51 @@
+package org.apache.kafka.clients.admin;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+
+public class FeatureUpdate {

Review comment:
       add doc to entire class

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1214,6 +1215,10 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati
      */
     AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
 
+    DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
       add doc

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/FeatureUpdate.java
##########
@@ -0,0 +1,51 @@
+package org.apache.kafka.clients.admin;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+
+public class FeatureUpdate {
+    private String name;

Review comment:
       attributes can be final

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFinalizedFeaturesRequest.java
##########
@@ -0,0 +1,72 @@
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesResponseData;
+import org.apache.kafka.common.message.UpdateFinalizedFeaturesRequestData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class UpdateFinalizedFeaturesRequest extends AbstractRequest {

Review comment:
       make variables final throught class
   add doc

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) {
         return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        Call callViaLeastLoadedNode = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else {
+                    future.completeExceptionally(
+                        Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);
+            }
+        };
+
+        Call call = callViaLeastLoadedNode;
+        if (options.shouldUseControllerAsDestination()) {
+            call = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()),
+                new ControllerNodeProvider()) {
+
+                @Override
+                ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                    return (ApiVersionsRequest.Builder) callViaLeastLoadedNode.createRequest(timeoutMs);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse response) {
+                    callViaLeastLoadedNode.handleResponse(response);
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    callViaLeastLoadedNode.handleFailure(throwable);
+                }
+            };
+        }
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFinalizedFeaturesResult updateFinalizedFeatures(

Review comment:
       add test code in `KafkaAdminClientTest`

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -71,9 +90,37 @@ object SupportedFeatures extends Logging {
    *                    is empty, it means there were no feature incompatibilities found.
    */
   def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(getSupportedFeatures, finalized, true)
+  }
+
+}
+
+object BrokerFeatures extends Logging {
+  /**
+   * Returns true if any of the provided finalized features are incompatible with the provided
+   * supported features.
+   *
+   * @param supported   The supported features to be compared
+   * @param finalized   The finalized features to be compared
+   *
+   * @return            - True if there are any incompatibilities.

Review comment:
       say "if there are any feature incompatibilities found."

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -319,7 +322,10 @@
     GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
         GroupSubscribedToTopicException::new),
     INVALID_RECORD(87, "This record has failed the validation on broker and hence will be rejected.", InvalidRecordException::new),
-    UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new);
+    UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new),
+    INCOMPATIBLE_FEATURES(89, "Could not apply finalized feature updates due to incompatible features.", IncompatibleFeaturesException::new),
+    FEATURE_VERSIONING_DISABLED(90, "Feature versioning system is disabled.", FeatureVersioningDisabledException::new),

Review comment:
       Eliminate and use INVALID_REQUEST

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -68,6 +69,36 @@ public ApiVersionsResponse(Struct struct, short version) {
         this(new ApiVersionsResponseData(struct, version));
     }
 
+    public ApiVersionsResponseData data() {
+        return data;
+    }
+
+    public Features<SupportedVersionRange> supportedFeatures() {
+        Map<String, SupportedVersionRange> features = new HashMap<>();

Review comment:
       final

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -68,6 +69,36 @@ public ApiVersionsResponse(Struct struct, short version) {
         this(new ApiVersionsResponseData(struct, version));
     }
 
+    public ApiVersionsResponseData data() {
+        return data;
+    }
+
+    public Features<SupportedVersionRange> supportedFeatures() {
+        Map<String, SupportedVersionRange> features = new HashMap<>();
+
+        for (SupportedFeatureKey key : data.supportedFeatures().valuesSet()) {
+            features.put(key.name(), new SupportedVersionRange(key.minVersion(),key.maxVersion()));
+        }
+
+        return Features.supportedFeatures(features);
+    }
+
+    public Features<FinalizedVersionRange> finalizedFeatures() {
+        Map<String, FinalizedVersionRange> features = new HashMap<>();

Review comment:
       final

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##########
@@ -68,6 +69,36 @@ public ApiVersionsResponse(Struct struct, short version) {
         this(new ApiVersionsResponseData(struct, version));
     }
 
+    public ApiVersionsResponseData data() {
+        return data;
+    }
+
+    public Features<SupportedVersionRange> supportedFeatures() {
+        Map<String, SupportedVersionRange> features = new HashMap<>();
+
+        for (SupportedFeatureKey key : data.supportedFeatures().valuesSet()) {
+            features.put(key.name(), new SupportedVersionRange(key.minVersion(),key.maxVersion()));

Review comment:
       space between "," and "key"

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -71,9 +90,37 @@ object SupportedFeatures extends Logging {
    *                    is empty, it means there were no feature incompatibilities found.
    */
   def incompatibleFeatures(finalized: Features[FinalizedVersionRange]): Features[FinalizedVersionRange] = {
+    BrokerFeatures.incompatibleFeatures(getSupportedFeatures, finalized, true)
+  }
+
+}
+
+object BrokerFeatures extends Logging {
+  /**
+   * Returns true if any of the provided finalized features are incompatible with the provided
+   * supported features.
+   *
+   * @param supported   The supported features to be compared
+   * @param finalized   The finalized features to be compared
+   *
+   * @return            - True if there are any incompatibilities.
+   *                    - False otherwise.
+   */
+  def hasIncompatibleFeatures(
+    supported: Features[SupportedVersionRange],

Review comment:
       s/supported/supportedFeatures
   same for other one

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -266,6 +277,24 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def setupFeatureZNode(newNode: FeatureZNode): Unit = {

Review comment:
       add doc and explain various cases

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -24,13 +24,13 @@ import org.apache.kafka.common.feature.Features._
 import scala.jdk.CollectionConverters._
 
 /**
- * A common immutable object used in the Broker to define the latest features supported by the
- * Broker. Also provides API to check for incompatibilities between the latest features supported
- * by the Broker and cluster-wide finalized features.
+ * A class that defines the latest features supported by the Broker, and the finalized cluster-wide

Review comment:
       can improve by splitting into few lines

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2945,6 +2950,113 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleUpdateFinalizedFeatures(request: RequestChannel.Request): Unit = {
+    val updateFinalizedFeaturesRequest = request.body[UpdateFinalizedFeaturesRequest]
+    def sendResponseCallback(error: Errors, msgOverride: Option[String]): Unit = {
+      sendResponseExemptThrottle(request, new UpdateFinalizedFeaturesResponse(
+        new UpdateFinalizedFeaturesResponseData()
+          .setErrorCode(error.code())
+          .setErrorMessage(msgOverride.getOrElse(error.message()))))
+    }
+    if (!authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+      sendResponseCallback(Errors.CLUSTER_AUTHORIZATION_FAILED, Option.empty)
+    } else if (!controller.isActive) {
+      sendResponseCallback(Errors.NOT_CONTROLLER, Option.empty)
+    } else if (!config.isFeatureVersioningEnabled) {
+      sendResponseCallback(Errors.FEATURE_VERSIONING_DISABLED, Option.empty)
+    } else {
+      val targetFeaturesOrError = getTargetFinalizedFeaturesOrError(updateFinalizedFeaturesRequest)
+      targetFeaturesOrError match {
+        case Left(targetFeatures) =>
+          controller.updateFinalizedFeatures(targetFeatures, sendResponseCallback)
+        case Right(error) =>
+          sendResponseCallback(error.error, Some(error.message))
+      }
+    }
+  }
+
+  def getTargetFinalizedFeaturesOrError(request: UpdateFinalizedFeaturesRequest):

Review comment:
       add doc

##########
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##########
@@ -39,21 +39,40 @@ object SupportedFeatures extends Logging {
    */
   @volatile private var supportedFeatures = emptySupportedFeatures
 
+  /**
+   * This is the cluster-wide finalized minimum version levels.
+   * This is currently empty, but in the future as we define supported features, this map can be
+   * populated in cases where minimum version level of a finalized feature is advanced beyond 1.
+   */
+  @volatile private var finalizedFeatureMinVersionLevels = Map[String, Short]()
+
   /**
    * Returns a reference to the latest features supported by the Broker.
    */
-  def get: Features[SupportedVersionRange] = {
+  def getSupportedFeatures: Features[SupportedVersionRange] = {
     supportedFeatures
   }
 
   // For testing only.
-  def update(newFeatures: Features[SupportedVersionRange]): Unit = {
+  def updateSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = {
     supportedFeatures = newFeatures
   }
 
+  def getFinalizedMinVersionLevel(feature: String): Short = {
+    finalizedFeatureMinVersionLevels.getOrElse(feature, 1)
+  }
+
   // For testing only.
-  def clear(): Unit = {
-    supportedFeatures = emptySupportedFeatures
+  def updateMinVersionLevels(newMinVersionLevels: Map[String, Short]): Unit = {
+    finalizedFeatureMinVersionLevels = newMinVersionLevels
+  }
+
+  def getDefaultFinalizedFeatures: Features[FinalizedVersionRange] = {

Review comment:
       check braces ()

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1480,6 +1480,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
   def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
+  /** Feature  configuration*/

Review comment:
       remove these 2 lines

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1647,6 +1693,33 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  private def processUpdateFinalizedFeatures(newFeatures: Features[FinalizedVersionRange],
+                                             callback: UpdateFinalizedFeaturesCallback): Unit = {
+    if (isActive) {
+      val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => {
+        BrokerFeatures.hasIncompatibleFeatures(broker.features, newFeatures)
+      })
+      if (numIncompatibleBrokers > 0) {
+        callback(
+          Errors.INCOMPATIBLE_FEATURES,
+          Some(
+            s"Could not apply finalized feature updates because $numIncompatibleBrokers brokers" +
+            " were found to have incompatible features."))
+      } else {
+        try {
+          zkClient.updateFeatureZNode(new FeatureZNode(FeatureZNodeStatus.Enabled, newFeatures))

Review comment:
       Shouldn't the code be waiting here?

##########
File path: clients/src/main/resources/common/message/UpdateFinalizedFeaturesRequest.json
##########
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFinalizedFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",

Review comment:
       Eliminate timeout?

##########
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##########
@@ -53,6 +56,26 @@ object FinalizedFeatureCache extends Logging {
     featuresAndEpoch.isEmpty
   }
 
+  def waitUntilEpochOrThrow(expectedMinEpoch: Int, timeoutMs: Long): Unit = {

Review comment:
       Add unit test
   Add doc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org