You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/31 18:43:52 UTC

[kafka] branch 3.3 updated (26a884cc30 -> 562c30f187)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a change to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 26a884cc30 KAFKA-14187: kafka-features.sh: add support for --metadata (#12571)
     new c1c7f2a9c7 MINOR: Enable testUpdateFeaturesWithForwarding (#12059)
     new 562c30f187 KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/common/requests/ApiVersionsResponse.java | 12 +++++++--
 .../scala/kafka/server/ApiVersionManager.scala     |  8 +++---
 .../main/scala/kafka/server/BrokerFeatures.scala   |  8 ++++--
 .../network/DynamicNumNetworkThreadsTest.scala     |  2 +-
 .../FetchRequestBetweenDifferentIbpTest.scala      |  3 +--
 .../kafka/server/FetchRequestTestDowngrade.scala   |  3 +--
 .../kafka/server/KRaftClusterTest.scala            | 30 +++++++++++++++++++++-
 .../server/MetadataVersionIntegrationTest.scala    |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  8 +++++-
 .../metadata/BrokerMetadataPublisherTest.scala     |  3 +--
 10 files changed, 62 insertions(+), 17 deletions(-)


[kafka] 01/02: MINOR: Enable testUpdateFeaturesWithForwarding (#12059)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c1c7f2a9c78ef37648efb61569b4b811d09d09fe
Author: dengziming <de...@gmail.com>
AuthorDate: Wed Aug 31 15:06:21 2022 +0800

    MINOR: Enable testUpdateFeaturesWithForwarding (#12059)
    
    This test was removed in #11667 since UpdateFeatures is not properly handled in KRaft mode, now we can bring it back since UpdateFeatures is properly handled after #12036.
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d176f369f8..dada6187be 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -923,6 +923,12 @@ class KafkaApisTest {
     testForwardableApi(ApiKeys.CREATE_PARTITIONS, requestBuilder)
   }
 
+  @Test
+  def testUpdateFeaturesWithForwarding(): Unit = {
+    val requestBuilder = new UpdateFeaturesRequest.Builder(new UpdateFeaturesRequestData())
+    testForwardableApi(ApiKeys.UPDATE_FEATURES, requestBuilder)
+  }
+
   @Test
   def testDeleteTopicsWithForwarding(): Unit = {
     val requestBuilder = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData())


[kafka] 02/02: KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)

Posted by cm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 562c30f1876a2c3cbecbc736df6babc751dc8204
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Sep 1 02:35:58 2022 +0800

    KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)
    
    Previously, the KRaft controller was incorrectly reporting an empty feature set in
    ApiVersionResponse. This was preventing any multi-node clusters from being upgraded via
    kafka-features.sh, since they would incorrectly believe that metadata.version was not a supported
    feature. This PR adds a regression test for this bug, KRaftClusterTest.testUpdateMetadataVersion.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, Colin P. McCabe <cm...@apache.org>
---
 .../kafka/common/requests/ApiVersionsResponse.java | 12 +++++++--
 .../scala/kafka/server/ApiVersionManager.scala     |  8 +++---
 .../main/scala/kafka/server/BrokerFeatures.scala   |  8 ++++--
 .../network/DynamicNumNetworkThreadsTest.scala     |  2 +-
 .../FetchRequestBetweenDifferentIbpTest.scala      |  3 +--
 .../kafka/server/FetchRequestTestDowngrade.scala   |  3 +--
 .../kafka/server/KRaftClusterTest.scala            | 30 +++++++++++++++++++++-
 .../server/MetadataVersionIntegrationTest.scala    |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |  3 +--
 10 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 7c98eb2679..28c9d613bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -107,17 +107,25 @@ public class ApiVersionsResponse extends AbstractResponse {
         int throttleTimeMs,
         ApiMessageType.ListenerType listenerType
     ) {
-        return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType));
+        return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType), Features.emptySupportedFeatures());
     }
 
     public static ApiVersionsResponse createApiVersionsResponse(
         int throttleTimeMs,
         ApiVersionCollection apiVersions
+    ) {
+        return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
+    }
+
+    public static ApiVersionsResponse createApiVersionsResponse(
+        int throttleTimeMs,
+        ApiVersionCollection apiVersions,
+        Features<SupportedVersionRange> latestSupportedFeatures
     ) {
         return createApiVersionsResponse(
             throttleTimeMs,
             apiVersions,
-            Features.emptySupportedFeatures(),
+            latestSupportedFeatures,
             Collections.emptyMap(),
             UNKNOWN_FINALIZED_FEATURES_EPOCH);
     }
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 6d329673a8..20dc043563 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -18,6 +18,7 @@ package kafka.server
 
 import kafka.network
 import kafka.network.RequestChannel
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.ApiVersionsResponse
@@ -51,17 +52,18 @@ object ApiVersionManager {
 
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
-  val enabledApis: collection.Set[ApiKeys]
+  val enabledApis: collection.Set[ApiKeys],
+  brokerFeatures: Features[SupportedVersionRange]
 ) extends ApiVersionManager {
 
   def this(listenerType: ListenerType) = {
-    this(listenerType, ApiKeys.apisForListener(listenerType).asScala)
+    this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures())
   }
 
   private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
-    ApiVersionsResponse.createApiVersionsResponse(0, apiVersions)
+    ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
   }
 }
 
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index 70ef7c71cb..040529dde5 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -71,9 +71,13 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
 object BrokerFeatures extends Logging {
 
   def createDefault(): BrokerFeatures = {
-    new BrokerFeatures(Features.supportedFeatures(
+    new BrokerFeatures(defaultSupportedFeatures())
+  }
+
+  def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+    Features.supportedFeatures(
       java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
-        new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))))
+        new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel())))
   }
 
   def createEmpty(): BrokerFeatures = {
diff --git a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
index f8893bd1da..b9f05d5aa8 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package integration.kafka.network
+package kafka.network
 
 import kafka.server.{BaseRequestTest, Defaults, KafkaConfig}
 import kafka.utils.TestUtils
diff --git a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
index 36d9c00bfd..f4fd79b509 100644
--- a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package integration.kafka.server
+package kafka.server
 
 import java.time.Duration
 import java.util.Arrays.asList
 
-import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
diff --git a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
index c714b8cc36..25867cda7e 100644
--- a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package integration.kafka.server
+package kafka.server
 
 import java.time.Duration
 import java.util.Arrays.asList
 
-import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
 import kafka.zk.ZkVersion
 import org.apache.kafka.clients.producer.ProducerRecord
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index a360ee15a2..ff16d03e80 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -21,7 +21,7 @@ import kafka.network.SocketServer
 import kafka.server.IntegrationTestUtils.connectAndReceive
 import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, NewPartitionReassignment, NewTopic}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, FeatureUpdate, NewPartitionReassignment, NewTopic, UpdateFeaturesOptions}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.common.message.DescribeClusterRequestData
 import org.apache.kafka.common.network.ListenerName
@@ -781,6 +781,7 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
   def createAdminClient(cluster: KafkaClusterTestKit): Admin = {
     var props: Properties = null
     props = cluster.clientProperties()
@@ -832,4 +833,31 @@ class KRaftClusterTest {
     }
   }
 
+
+  @Test
+  def testUpdateMetadataVersion(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION).
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.updateFeatures(
+          Map(MetadataVersion.FEATURE_NAME ->
+            new FeatureUpdate(MetadataVersion.latest().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions
+        )
+      } finally {
+        admin.close()
+      }
+      TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latest()),
+        "Timed out waiting for metadata version update.")
+    } finally {
+      cluster.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5270f627bf..81810c61da 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package integration.kafka.server
+package kafka.server
 
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterTest, ClusterTests, Type}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index dada6187be..aefb1a5348 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -181,7 +181,7 @@ class KafkaApisTest {
     } else {
       ApiKeys.apisForListener(listenerType).asScala.toSet
     }
-    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis)
+    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures())
 
     new KafkaApis(
       metadataSupport = metadataSupport,
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 652b8b3a0c..b571e2d0ab 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package unit.kafka.server.metadata
+package kafka.server.metadata
 
 import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
 import kafka.log.UnifiedLog
 import kafka.server.{BrokerServer, KafkaConfig}
-import kafka.server.metadata.BrokerMetadataPublisher
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET