You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by de...@apache.org on 2023/05/12 05:46:16 UTC

[kafka] branch trunk updated: KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse (#13679)

This is an automated email from the ASF dual-hosted git repository.

dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7c9842f709 KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse (#13679)
a7c9842f709 is described below

commit a7c9842f709c5985ea41a5811e479551ea0fb7f2
Author: dengziming <de...@gmail.com>
AuthorDate: Fri May 12 13:46:06 2023 +0800

    KAFKA-14291: KRaft controller should return right finalized features in ApiVersionResponse (#13679)
    
    The KRaft controller return empty finalized features in `ApiVersionResponse`, the brokers are not infected by this, so this problem doesn't have any impact currently, but it's worth fixing it to avoid unexpected problems.
    
    And there is a bunch of of confusing methods in `ApiVersionResponse` which are only used in test code, I moved them to TestUtils to make the code more clear, and force everyone to pass in the correct parameters instead of the default zero parameters, for example, empty supported features and empty finalized features.
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../kafka/common/requests/ApiVersionsResponse.java | 54 --------------------
 .../apache/kafka/clients/NetworkClientTest.java    |  4 +-
 .../apache/kafka/clients/NodeApiVersionsTest.java  |  5 +-
 .../clients/consumer/internals/FetcherTest.java    |  2 +-
 .../clients/producer/internals/SenderTest.java     |  2 +-
 .../apache/kafka/common/network/NioEchoServer.java |  3 +-
 .../common/network/SaslChannelBuilderTest.java     |  3 +-
 .../common/network/SslTransportLayerTest.java      |  2 +-
 .../common/requests/ApiVersionsResponseTest.java   |  3 +-
 .../kafka/common/requests/RequestResponseTest.java |  5 +-
 .../authenticator/SaslAuthenticatorTest.java       |  4 +-
 .../authenticator/SaslServerAuthenticatorTest.java |  3 +-
 .../test/java/org/apache/kafka/test/TestUtils.java | 59 ++++++++++++++++++++++
 .../scala/kafka/server/ApiVersionManager.scala     | 51 ++++++++++++++++++-
 .../main/scala/kafka/server/ControllerApis.scala   | 33 +++++++-----
 .../scala/kafka/tools/TestRaftRequestHandler.scala |  2 +-
 .../kafka/server/GssapiAuthenticationTest.scala    |  3 +-
 .../server/AbstractApiVersionsRequestTest.scala    | 20 ++++++--
 .../unit/kafka/server/ApiVersionsRequestTest.scala |  4 +-
 .../apache/kafka/controller/QuorumController.java  |  5 ++
 .../metadata/FinalizedControllerFeatures.java      |  4 ++
 21 files changed, 179 insertions(+), 92 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index cd0a2fa219d..87f0277d196 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -34,7 +34,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.RecordVersion;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -106,59 +105,6 @@ public class ApiVersionsResponse extends AbstractResponse {
         }
     }
 
-    public static ApiVersionsResponse defaultApiVersionsResponse(
-        ApiMessageType.ListenerType listenerType
-    ) {
-        return defaultApiVersionsResponse(0, listenerType);
-    }
-
-    public static ApiVersionsResponse defaultApiVersionsResponse(
-        int throttleTimeMs,
-        ApiMessageType.ListenerType listenerType
-    ) {
-        return createApiVersionsResponse(
-            throttleTimeMs,
-            filterApis(RecordVersion.current(), listenerType, true),
-            Features.emptySupportedFeatures(),
-            false
-        );
-    }
-
-    public static ApiVersionsResponse defaultApiVersionsResponse(
-        int throttleTimeMs,
-        ApiMessageType.ListenerType listenerType,
-        boolean enableUnstableLastVersion
-    ) {
-        return createApiVersionsResponse(
-            throttleTimeMs,
-            filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
-            Features.emptySupportedFeatures(),
-            false
-        );
-    }
-
-    public static ApiVersionsResponse createApiVersionsResponse(
-        int throttleTimeMs,
-        ApiVersionCollection apiVersions
-    ) {
-        return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
-    }
-
-    public static ApiVersionsResponse createApiVersionsResponse(
-        int throttleTimeMs,
-        ApiVersionCollection apiVersions,
-        Features<SupportedVersionRange> latestSupportedFeatures,
-        boolean zkMigrationEnabled
-    ) {
-        return createApiVersionsResponse(
-            throttleTimeMs,
-            apiVersions,
-            latestSupportedFeatures,
-            Collections.emptyMap(),
-            UNKNOWN_FINALIZED_FEATURES_EPOCH,
-            zkMigrationEnabled);
-    }
-
     public static ApiVersionsResponse createApiVersionsResponse(
         int throttleTimeMs,
         RecordVersion minRecordVersion,
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 810448efc04..f5b85e5e787 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -250,7 +250,7 @@ public class NetworkClientTest {
 
     private void awaitReady(NetworkClient client, Node node) {
         if (client.discoverBrokerVersions()) {
-            setExpectedApiVersionsResponse(ApiVersionsResponse.defaultApiVersionsResponse(
+            setExpectedApiVersionsResponse(TestUtils.defaultApiVersionsResponse(
                 ApiMessageType.ListenerType.ZK_BROKER));
         }
         while (!client.ready(node, time.milliseconds()))
@@ -1183,7 +1183,7 @@ public class NetworkClientTest {
     }
 
     private ApiVersionsResponse defaultApiVersionsResponse() {
-        return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
     }
 
     private static class TestCallbackHandler implements RequestCompletionHandler {
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 6fa6d6242e2..c7849883b37 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -140,7 +141,7 @@ public class NodeApiVersionsTest {
     @ParameterizedTest
     @EnumSource(ApiMessageType.ListenerType.class)
     public void testUsableVersionLatestVersions(ApiMessageType.ListenerType scope) {
-        ApiVersionsResponse defaultResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
+        ApiVersionsResponse defaultResponse = TestUtils.defaultApiVersionsResponse(scope);
         List<ApiVersion> versionList = new LinkedList<>(defaultResponse.data().apiKeys());
         // Add an API key that we don't know about.
         versionList.add(new ApiVersion()
@@ -156,7 +157,7 @@ public class NodeApiVersionsTest {
     @ParameterizedTest
     @EnumSource(ApiMessageType.ListenerType.class)
     public void testConstructionFromApiVersionsResponse(ApiMessageType.ListenerType scope) {
-        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
+        ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(scope);
         NodeApiVersions versions = new NodeApiVersions(apiVersionsResponse.data().apiKeys(), Collections.emptyList(), false);
 
         for (ApiVersion apiVersionKey : apiVersionsResponse.data().apiKeys()) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index e4ddce247db..c52c1587803 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1909,7 +1909,7 @@ public class FetcherTest {
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
                 time, true, new ApiVersions(), metricsManager.throttleTimeSensor(), new LogContext());
 
-        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+        ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
             400, ApiMessageType.ListenerType.ZK_BROKER);
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 7ee2dc56611..f6c91659356 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -291,7 +291,7 @@ public class SenderTest {
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
                 time, true, new ApiVersions(), throttleTimeSensor, logContext);
 
-        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+        ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
             400, ApiMessageType.ListenerType.ZK_BROKER);
         ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(apiVersionsResponse, ApiKeys.API_VERSIONS.latestVersion(), 0);
 
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 0cd563355e5..5cf1bbca989 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
@@ -125,7 +124,7 @@ public class NioEchoServer extends Thread {
         if (channelBuilder == null)
             channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false,
                 securityProtocol, config, credentialCache, tokenCache, time, logContext,
-                () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
+                () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
         this.metrics = new Metrics();
         this.selector = new Selector(10000, failedAuthenticationDelayMs, metrics, time,
                 "MetricGroup", channelBuilder, logContext);
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 1697c627fc4..35ea36367f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.kafka.common.security.scram.ScramLoginModule;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
 import org.ietf.jgss.GSSContext;
 import org.ietf.jgss.GSSCredential;
 import org.ietf.jgss.GSSManager;
@@ -177,7 +178,7 @@ public class SaslChannelBuilderTest {
     }
 
     private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
-        return () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
     }
 
     private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol, String saslMechanism) {
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index a391208faaf..26987e30da8 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -1326,7 +1326,7 @@ public class SslTransportLayerTest {
     }
 
     private Supplier<ApiVersionsResponse> defaultApiVersionsSupplier() {
-        return () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
     }
 
     static class TestSslChannelBuilder extends SslChannelBuilder {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index f1931bc5882..1e5f8493f60 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.RecordVersion;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -49,7 +50,7 @@ public class ApiVersionsResponseTest {
     @ParameterizedTest
     @EnumSource(ApiMessageType.ListenerType.class)
     public void shouldHaveCorrectDefaultApiVersionsResponse(ApiMessageType.ListenerType scope) {
-        ApiVersionsResponse defaultResponse = ApiVersionsResponse.defaultApiVersionsResponse(scope);
+        ApiVersionsResponse defaultResponse = TestUtils.defaultApiVersionsResponse(scope);
         assertEquals(ApiKeys.apisForListener(scope).size(), defaultResponse.data().apiKeys().size(),
             "API versions for all API keys must be maintained.");
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cb84ff94841..64a7300935b 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -239,6 +239,7 @@ import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.SecurityUtils;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -388,7 +389,7 @@ public class RequestResponseTest {
     public void testApiVersionsSerialization() {
         for (short version : API_VERSIONS.allVersions()) {
             checkErrorResponse(createApiVersionRequest(version), new UnsupportedVersionException("Not Supported"));
-            checkResponse(ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), version);
+            checkResponse(TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER), version);
         }
     }
 
@@ -874,7 +875,7 @@ public class RequestResponseTest {
     }
 
     private ApiVersionsResponse defaultApiVersionsResponse() {
-        return ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
+        return TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER);
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 40a27935f3f..a633c7db1e3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -1980,7 +1980,7 @@ public class SaslAuthenticatorTest {
             ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
 
         Supplier<ApiVersionsResponse> apiVersionSupplier = () -> {
-            ApiVersionsResponse defaultApiVersionResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+            ApiVersionsResponse defaultApiVersionResponse = TestUtils.defaultApiVersionsResponse(
                 ApiMessageType.ListenerType.ZK_BROKER);
             ApiVersionCollection apiVersions = new ApiVersionCollection();
             for (ApiVersion apiVersion : defaultApiVersionResponse.data().apiKeys()) {
@@ -2578,7 +2578,7 @@ public class SaslAuthenticatorTest {
                 DelegationTokenCache tokenCache, Time time) {
             super(mode, jaasContexts, securityProtocol, listenerName, isInterBrokerListener, clientSaslMechanism,
                 handshakeRequestEnable, credentialCache, tokenCache, null, time, new LogContext(),
-                () -> ApiVersionsResponse.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
+                () -> TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.ZK_BROKER));
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index b0dec3e522a..4bb2964d807 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.mockito.Answers;
 import org.mockito.ArgumentCaptor;
@@ -374,7 +375,7 @@ public class SaslServerAuthenticatorTest {
         Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
         Map<String, AuthenticateCallbackHandler> callbackHandlers = Collections.singletonMap(
                 mechanism, new SaslServerCallbackHandler());
-        ApiVersionsResponse apiVersionsResponse = ApiVersionsResponse.defaultApiVersionsResponse(
+        ApiVersionsResponse apiVersionsResponse = TestUtils.defaultApiVersionsResponse(
                 ApiMessageType.ListenerType.ZK_BROKER);
         Map<String, Long> connectionsMaxReauthMsByMechanism = maxReauth != null ?
                 Collections.singletonMap(mechanism, maxReauth) : Collections.emptyMap();
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index be13e8e7a62..3fc51f23a2b 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,10 +22,16 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.RecordVersion;
 import org.apache.kafka.common.record.UnalignedRecords;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ByteBufferChannel;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.Exit;
@@ -581,4 +587,57 @@ public class TestUtils {
 
         return allSegmentsSet.equals(expectedSegmentsSet);
     }
+
+    public static ApiVersionsResponse defaultApiVersionsResponse(
+            ApiMessageType.ListenerType listenerType
+    ) {
+        return defaultApiVersionsResponse(0, listenerType);
+    }
+
+    public static ApiVersionsResponse defaultApiVersionsResponse(
+            int throttleTimeMs,
+            ApiMessageType.ListenerType listenerType
+    ) {
+        return createApiVersionsResponse(
+                throttleTimeMs,
+                ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, true),
+                Features.emptySupportedFeatures(),
+                false
+        );
+    }
+
+    public static ApiVersionsResponse defaultApiVersionsResponse(
+            int throttleTimeMs,
+            ApiMessageType.ListenerType listenerType,
+            boolean enableUnstableLastVersion
+    ) {
+        return createApiVersionsResponse(
+                throttleTimeMs,
+                ApiVersionsResponse.filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
+                Features.emptySupportedFeatures(),
+                false
+        );
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+            int throttleTimeMs,
+            ApiVersionsResponseData.ApiVersionCollection apiVersions
+    ) {
+        return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures(), false);
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+            int throttleTimeMs,
+            ApiVersionsResponseData.ApiVersionCollection apiVersions,
+            Features<SupportedVersionRange> latestSupportedFeatures,
+            boolean zkMigrationEnabled
+    ) {
+        return ApiVersionsResponse.createApiVersionsResponse(
+                throttleTimeMs,
+                apiVersions,
+                latestSupportedFeatures,
+                Collections.emptyMap(),
+                ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
+                zkMigrationEnabled);
+    }
 }
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 0204d94c976..c3346a5edb2 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -29,7 +29,19 @@ trait ApiVersionManager {
   def enableUnstableLastVersion: Boolean
   def listenerType: ListenerType
   def enabledApis: collection.Set[ApiKeys]
+
+  /**
+   * @see [[DefaultApiVersionManager.apiVersionResponse]]
+   * @see [[kafka.server.KafkaApis.handleApiVersionsRequest]]
+   */
   def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
+
+  /**
+   * @see [[SimpleApiVersionManager.apiVersionResponse]]
+   * @see [[kafka.server.ControllerApis.handleApiVersionsRequest]]
+   */
+  def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse
+
   def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
     apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
   }
@@ -55,6 +67,16 @@ object ApiVersionManager {
   }
 }
 
+/**
+ * A simple ApiVersionManager that does not support forwarding and does not have metadata cache, used in kraft controller.
+ * its enabled apis are determined by the listener type, its finalized features are dynamically determined by the controller.
+ *
+ * @param listenerType the listener type
+ * @param enabledApis the enabled apis, which are computed by the listener type
+ * @param brokerFeatures the broker features
+ * @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]]
+ * @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]]
+ */
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
   val enabledApis: collection.Set[ApiKeys],
@@ -80,10 +102,33 @@ class SimpleApiVersionManager(
   private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
-    ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures, zkMigrationEnabled)
+    throw new UnsupportedOperationException("This method is not supported in SimpleApiVersionManager, use apiVersionResponse(throttleTimeMs, finalizedFeatures, epoch) instead")
+  }
+
+  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeaturesEpoch: Long): ApiVersionsResponse = {
+    ApiVersionsResponse.createApiVersionsResponse(
+      throttleTimeMs,
+      apiVersions,
+      brokerFeatures,
+      finalizedFeatures.asJava,
+      finalizedFeaturesEpoch,
+      zkMigrationEnabled
+    )
   }
 }
 
+/**
+ * The default ApiVersionManager that supports forwarding and has metadata cache, used in broker and zk controller.
+ * When forwarding is enabled, the enabled apis are determined by the broker listener type and the controller apis,
+ * otherwise the enabled apis are determined by the broker listener type, which is the same with SimpleApiVersionManager.
+ *
+ * @param listenerType the listener type
+ * @param forwardingManager the forwarding manager,
+ * @param features the broker features
+ * @param metadataCache the metadata cache, used to get the finalized features and the metadata version
+ * @param enableUnstableLastVersion whether to enable unstable last version, see [[KafkaConfig.unstableApiVersionsEnabled]]
+ * @param zkMigrationEnabled whether to enable zk migration, see [[KafkaConfig.migrationEnabled]]
+ */
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
   forwardingManager: Option[ForwardingManager],
@@ -112,4 +157,8 @@ class DefaultApiVersionManager(
       zkMigrationEnabled
     )
   }
+
+  override def apiVersionResponse(throttleTimeMs: Int, finalizedFeatures: Map[String, java.lang.Short], finalizedFeatureEpoch: Long): ApiVersionsResponse = {
+    throw new UnsupportedOperationException("This method is not supported in DefaultApiVersionManager, use apiVersionResponse(throttleTimeMs) instead")
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index b8f233fa94f..ba3da5c47c7 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -431,24 +431,33 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
 
   def handleApiVersionsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
-    // Note that broker returns its full list of supported ApiKeys and versions regardless of current
+    // Note that controller returns its full list of supported ApiKeys and versions regardless of current
     // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
     // Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
-    // If this is considered to leak information about the broker version a workaround is to use SSL
+    // If this is considered to leak information about the controller version a workaround is to use SSL
     // with client authentication which is performed at an earlier stage of the connection where the
     // ApiVersionRequest is not available.
-    def createResponseCallback(requestThrottleMs: Int): ApiVersionsResponse = {
-      val apiVersionRequest = request.body[ApiVersionsRequest]
-      if (apiVersionRequest.hasUnsupportedRequestVersion) {
-        apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception)
-      } else if (!apiVersionRequest.isValid) {
-        apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception)
-      } else {
-        apiVersionManager.apiVersionResponse(requestThrottleMs)
+    val apiVersionRequest = request.body[ApiVersionsRequest]
+    if (apiVersionRequest.hasUnsupportedRequestVersion) {
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else if (!apiVersionRequest.isValid) {
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => apiVersionRequest.getErrorResponse(requestThrottleMs, INVALID_REQUEST.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty())
+      controller.finalizedFeatures(context).handle { (result, exception) =>
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+          if (exception != null) {
+            apiVersionRequest.getErrorResponse(requestThrottleMs, exception)
+          } else {
+            apiVersionManager.apiVersionResponse(requestThrottleMs, result.featureMap().asScala.toMap, result.epoch())
+          }
+        })
       }
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponseCallback)
-    CompletableFuture.completedFuture[Unit](())
   }
 
   def authorizeAlterResource(requestContext: RequestContext,
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index a9b471b1622..b379efeb858 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -64,7 +64,7 @@ class TestRaftRequestHandler(
   }
 
   private def handleApiVersions(request: RequestChannel.Request): Unit = {
-    requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0), None)
+    requestChannel.sendResponse(request, apiVersionManager.apiVersionResponse(throttleTimeMs = 0, Map.empty, -1), None)
   }
 
   private def handleVote(request: RequestChannel.Request): Unit = {
diff --git a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
index 2bda3aca1d2..89a354e7492 100644
--- a/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/GssapiAuthenticationTest.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.SaslAuthenticationException
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.network._
-import org.apache.kafka.common.requests.ApiVersionsResponse
 import org.apache.kafka.common.security.{JaasContext, TestSecurityConfig}
 import org.apache.kafka.common.security.auth.{Login, SecurityProtocol}
 import org.apache.kafka.common.security.kerberos.KerberosLogin
@@ -263,7 +262,7 @@ class GssapiAuthenticationTest extends IntegrationTestHarness with SaslSetup {
     val jaasContexts = Collections.singletonMap("GSSAPI", JaasContext.loadClientContext(config.values()))
     val channelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol,
       null, false, kafkaClientSaslMechanism, true, null, null, null, time, new LogContext(),
-      () => ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) {
+      () => org.apache.kafka.test.TestUtils.defaultApiVersionsResponse(ListenerType.ZK_BROKER)) {
       override protected def defaultLoginClass(): Class[_ <: Login] = classOf[TestableKerberosLogin]
     }
     channelBuilder.configure(config.values())
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 02ce2485974..5276d030cb8 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -27,6 +27,8 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.RecordVersion
 import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Tag
 
@@ -71,8 +73,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
   def validateApiVersionsResponse(
     apiVersionsResponse: ApiVersionsResponse,
     listenerName: ListenerName = cluster.clientListener(),
-    enableUnstableLastVersion: Boolean = false
+    enableUnstableLastVersion: Boolean = false,
+    apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
   ): Unit = {
+    if (cluster.isKRaftTest && apiVersion >= 3) {
+      assertEquals(1, apiVersionsResponse.data().finalizedFeatures().size())
+      assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
+      assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
+
+      assertEquals(1, apiVersionsResponse.data().supportedFeatures().size())
+      assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
+      assertEquals(MetadataVersion.latest().featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersion())
+    }
     val expectedApis = if (!cluster.isKRaftTest) {
       ApiVersionsResponse.collectApis(
         ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER),
@@ -96,11 +108,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
       "API keys in ApiVersionsResponse must match API keys supported by broker.")
 
     val defaultApiVersionsResponse = if (!cluster.isKRaftTest) {
-      ApiVersionsResponse.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, enableUnstableLastVersion)
+      TestUtils.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, enableUnstableLastVersion)
     } else if(cluster.controllerListenerName().asScala.contains(listenerName)) {
-      ApiVersionsResponse.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion)
+      TestUtils.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion)
     } else {
-      ApiVersionsResponse.createApiVersionsResponse(0, expectedApis)
+      TestUtils.createApiVersionsResponse(0, expectedApis)
     }
 
     for (expectedApiVersion: ApiVersion <- defaultApiVersionsResponse.data.apiKeys.asScala) {
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index cea87138e73..d041d68a843 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -81,7 +81,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
   def testApiVersionsRequestValidationV0(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.clientListener())
-    validateApiVersionsResponse(apiVersionsResponse)
+    validateApiVersionsResponse(apiVersionsResponse, apiVersion = 0)
   }
 
   @ClusterTest(clusterType = Type.ZK)
@@ -95,7 +95,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
   def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
     val apiVersionsRequest = new ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, cluster.controllerListenerName.get())
-    validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get())
+    validateApiVersionsResponse(apiVersionsResponse, cluster.controllerListenerName.get(), apiVersion = 0)
   }
 
   @ClusterTest
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 66fc2611a3d..96b4b9ef584 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -2021,6 +2021,11 @@ public final class QuorumController implements Controller {
     public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(
         ControllerRequestContext context
     ) {
+        // It's possible that we call ApiVersionRequest before consuming the log since ApiVersionRequest is sent when
+        // initialize NetworkClient, we should not return an error since it would stop the NetworkClient from working correctly.
+        if (lastCommittedOffset == -1) {
+            return CompletableFuture.completedFuture(new FinalizedControllerFeatures(Collections.emptyMap(), -1));
+        }
         return appendReadEvent("getFinalizedFeatures", context.deadlineNs(),
             () -> featureControl.finalizedFeatures(lastCommittedOffset));
     }
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java b/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
index 2ebce9e3e6c..05ef45d1e96 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/FinalizedControllerFeatures.java
@@ -44,6 +44,10 @@ public class FinalizedControllerFeatures {
         return featureMap.keySet();
     }
 
+    public Map<String, Short> featureMap() {
+        return featureMap;
+    }
+
     public long epoch() {
         return epoch;
     }