You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/31 18:43:54 UTC

[kafka] 02/02: KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 562c30f1876a2c3cbecbc736df6babc751dc8204
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Sep 1 02:35:58 2022 +0800

    KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)
    
    Previously, the KRaft controller was incorrectly reporting an empty feature set in
    ApiVersionResponse. This was preventing any multi-node clusters from being upgraded via
    kafka-features.sh, since they would incorrectly believe that metadata.version was not a supported
    feature. This PR adds a regression test for this bug, KRaftClusterTest.testUpdateMetadataVersion.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, Colin P. McCabe <cm...@apache.org>
---
 .../kafka/common/requests/ApiVersionsResponse.java | 12 +++++++--
 .../scala/kafka/server/ApiVersionManager.scala     |  8 +++---
 .../main/scala/kafka/server/BrokerFeatures.scala   |  8 ++++--
 .../network/DynamicNumNetworkThreadsTest.scala     |  2 +-
 .../FetchRequestBetweenDifferentIbpTest.scala      |  3 +--
 .../kafka/server/FetchRequestTestDowngrade.scala   |  3 +--
 .../kafka/server/KRaftClusterTest.scala            | 30 +++++++++++++++++++++-
 .../server/MetadataVersionIntegrationTest.scala    |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |  3 +--
 10 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 7c98eb2679..28c9d613bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -107,17 +107,25 @@ public class ApiVersionsResponse extends AbstractResponse {
         int throttleTimeMs,
         ApiMessageType.ListenerType listenerType
     ) {
-        return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType));
+        return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType), Features.emptySupportedFeatures());
     }
 
     public static ApiVersionsResponse createApiVersionsResponse(
         int throttleTimeMs,
         ApiVersionCollection apiVersions
+    ) {
+        return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+        int throttleTimeMs,
+        ApiVersionCollection apiVersions,
+        Features<SupportedVersionRange> latestSupportedFeatures
     ) {
         return createApiVersionsResponse(
             throttleTimeMs,
             apiVersions,
-            Features.emptySupportedFeatures(),
+            latestSupportedFeatures,
             Collections.emptyMap(),
             UNKNOWN_FINALIZED_FEATURES_EPOCH);
     }
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 6d329673a8..20dc043563 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import kafka.network
 import kafka.network.RequestChannel
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.ApiVersionsResponse
@@ -51,17 +52,18 @@ object ApiVersionManager {
 
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
-  val enabledApis: collection.Set[ApiKeys]
+  val enabledApis: collection.Set[ApiKeys],
+  brokerFeatures: Features[SupportedVersionRange]
 ) extends ApiVersionManager {
 
   def this(listenerType: ListenerType) = {
-    this(listenerType, ApiKeys.apisForListener(listenerType).asScala)
+    this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures())
   }
 
   private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
-    ApiVersionsResponse.createApiVersionsResponse(0, apiVersions)
+    ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
   }
 }
 
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index 70ef7c71cb..040529dde5 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -71,9 +71,13 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
 object BrokerFeatures extends Logging {
 
   def createDefault(): BrokerFeatures = {
-    new BrokerFeatures(Features.supportedFeatures(
+    new BrokerFeatures(defaultSupportedFeatures())
+  }
+
+  def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+    Features.supportedFeatures(
       java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
-        new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))))
+        new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel())))
   }
 
   def createEmpty(): BrokerFeatures = {
diff --git a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
index f8893bd1da..b9f05d5aa8 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package integration.kafka.network
+package kafka.network
 
 import kafka.server.{BaseRequestTest, Defaults, KafkaConfig}
 import kafka.utils.TestUtils
diff --git a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
index 36d9c00bfd..f4fd79b509 100644
--- a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package integration.kafka.server
+package kafka.server
 
 import java.time.Duration
 import java.util.Arrays.asList
 
-import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
diff --git a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
index c714b8cc36..25867cda7e 100644
--- a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package integration.kafka.server
+package kafka.server
 
 import java.time.Duration
 import java.util.Arrays.asList
 
-import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import kafka.zk.ZkVersion
 import org.apache.kafka.clients.producer.ProducerRecord
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index a360ee15a2..ff16d03e80 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -21,7 +21,7 @@ import kafka.network.SocketServer
 import kafka.server.IntegrationTestUtils.connectAndReceive
 import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, NewPartitionReassignment, NewTopic}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, FeatureUpdate, NewPartitionReassignment, NewTopic, UpdateFeaturesOptions}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.message.DescribeClusterRequestData
 import org.apache.kafka.common.network.ListenerName
@@ -781,6 +781,7 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
   def createAdminClient(cluster: KafkaClusterTestKit): Admin = {
     var props: Properties = null
     props = cluster.clientProperties()
@@ -832,4 +833,31 @@ class KRaftClusterTest {
     }
   }
 
+
+  @Test
+  def testUpdateMetadataVersion(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION).
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.updateFeatures(
+          Map(MetadataVersion.FEATURE_NAME ->
+            new FeatureUpdate(MetadataVersion.latest().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions
+        )
+      } finally {
+        admin.close()
+      }
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latest()),
+        "Timed out waiting for metadata version update.")
+    } finally {
+      cluster.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5270f627bf..81810c61da 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package integration.kafka.server
+package kafka.server
 
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTests, Type}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index dada6187be..aefb1a5348 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -181,7 +181,7 @@ class KafkaApisTest {
     } else {
       ApiKeys.apisForListener(listenerType).asScala.toSet
     }
-    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis)
+    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures())
 
     new KafkaApis(
       metadataSupport = metadataSupport,
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 652b8b3a0c..b571e2d0ab 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package unit.kafka.server.metadata
+package kafka.server.metadata
 
 import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import kafka.log.UnifiedLog
 import kafka.server.{BrokerServer, KafkaConfig}
-import kafka.server.metadata.BrokerMetadataPublisher
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET