You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/31 18:43:54 UTC
[kafka] 02/02: KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 562c30f1876a2c3cbecbc736df6babc751dc8204
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Sep 1 02:35:58 2022 +0800
KAFKA-13990: KRaft controller should return right features in ApiVersionResponse (#12294)
Previously, the KRaft controller was incorrectly reporting an empty feature set in
ApiVersionResponse. This was preventing any multi-node clusters from being upgraded via
kafka-features.sh, since they would incorrectly believe that metadata.version was not a supported
feature. This PR adds a regression test for this bug, KRaftClusterTest.testUpdateMetadataVersion.
Reviewers: José Armando García Sancio <js...@gmail.com>, Colin P. McCabe <cm...@apache.org>
---
.../kafka/common/requests/ApiVersionsResponse.java | 12 +++++++--
.../scala/kafka/server/ApiVersionManager.scala | 8 +++---
.../main/scala/kafka/server/BrokerFeatures.scala | 8 ++++--
.../network/DynamicNumNetworkThreadsTest.scala | 2 +-
.../FetchRequestBetweenDifferentIbpTest.scala | 3 +--
.../kafka/server/FetchRequestTestDowngrade.scala | 3 +--
.../kafka/server/KRaftClusterTest.scala | 30 +++++++++++++++++++++-
.../server/MetadataVersionIntegrationTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
.../metadata/BrokerMetadataPublisherTest.scala | 3 +--
10 files changed, 56 insertions(+), 17 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 7c98eb2679..28c9d613bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -107,17 +107,25 @@ public class ApiVersionsResponse extends AbstractResponse {
int throttleTimeMs,
ApiMessageType.ListenerType listenerType
) {
- return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType));
+ return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType), Features.emptySupportedFeatures());
}
public static ApiVersionsResponse createApiVersionsResponse(
int throttleTimeMs,
ApiVersionCollection apiVersions
+ ) {
+ return createApiVersionsResponse(throttleTimeMs, apiVersions, Features.emptySupportedFeatures());
+ }
+
+ public static ApiVersionsResponse createApiVersionsResponse(
+ int throttleTimeMs,
+ ApiVersionCollection apiVersions,
+ Features<SupportedVersionRange> latestSupportedFeatures
) {
return createApiVersionsResponse(
throttleTimeMs,
apiVersions,
- Features.emptySupportedFeatures(),
+ latestSupportedFeatures,
Collections.emptyMap(),
UNKNOWN_FINALIZED_FEATURES_EPOCH);
}
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 6d329673a8..20dc043563 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -18,6 +18,7 @@ package kafka.server
import kafka.network
import kafka.network.RequestChannel
+import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.ApiVersionsResponse
@@ -51,17 +52,18 @@ object ApiVersionManager {
class SimpleApiVersionManager(
val listenerType: ListenerType,
- val enabledApis: collection.Set[ApiKeys]
+ val enabledApis: collection.Set[ApiKeys],
+ brokerFeatures: Features[SupportedVersionRange]
) extends ApiVersionManager {
def this(listenerType: ListenerType) = {
- this(listenerType, ApiKeys.apisForListener(listenerType).asScala)
+ this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures())
}
private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
- ApiVersionsResponse.createApiVersionsResponse(0, apiVersions)
+ ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
}
}
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index 70ef7c71cb..040529dde5 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -71,9 +71,13 @@ class BrokerFeatures private (@volatile var supportedFeatures: Features[Supporte
object BrokerFeatures extends Logging {
def createDefault(): BrokerFeatures = {
- new BrokerFeatures(Features.supportedFeatures(
+ new BrokerFeatures(defaultSupportedFeatures())
+ }
+
+ def defaultSupportedFeatures(): Features[SupportedVersionRange] = {
+ Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
- new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))))
+ new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel())))
}
def createEmpty(): BrokerFeatures = {
diff --git a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
index f8893bd1da..b9f05d5aa8 100644
--- a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
+++ b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package integration.kafka.network
+package kafka.network
import kafka.server.{BaseRequestTest, Defaults, KafkaConfig}
import kafka.utils.TestUtils
diff --git a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
index 36d9c00bfd..f4fd79b509 100644
--- a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package integration.kafka.server
+package kafka.server
import java.time.Duration
import java.util.Arrays.asList
-import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
diff --git a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
index c714b8cc36..25867cda7e 100644
--- a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
+++ b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package integration.kafka.server
+package kafka.server
import java.time.Duration
import java.util.Arrays.asList
-import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import kafka.zk.ZkVersion
import org.apache.kafka.clients.producer.ProducerRecord
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index a360ee15a2..ff16d03e80 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -21,7 +21,7 @@ import kafka.network.SocketServer
import kafka.server.IntegrationTestUtils.connectAndReceive
import kafka.testkit.{BrokerNode, KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, NewPartitionReassignment, NewTopic}
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, Config, ConfigEntry, DescribeMetadataQuorumOptions, FeatureUpdate, NewPartitionReassignment, NewTopic, UpdateFeaturesOptions}
import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.network.ListenerName
@@ -781,6 +781,7 @@ class KRaftClusterTest {
cluster.close()
}
}
+
def createAdminClient(cluster: KafkaClusterTestKit): Admin = {
var props: Properties = null
props = cluster.clientProperties()
@@ -832,4 +833,31 @@ class KRaftClusterTest {
}
}
+
+ @Test
+ def testUpdateMetadataVersion(): Unit = {
+ val cluster = new KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder().
+ setBootstrapMetadataVersion(MetadataVersion.MINIMUM_BOOTSTRAP_VERSION).
+ setNumBrokerNodes(4).
+ setNumControllerNodes(3).build()).build()
+ try {
+ cluster.format()
+ cluster.startup()
+ cluster.waitForReadyBrokers()
+ val admin = Admin.create(cluster.clientProperties())
+ try {
+ admin.updateFeatures(
+ Map(MetadataVersion.FEATURE_NAME ->
+ new FeatureUpdate(MetadataVersion.latest().featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions
+ )
+ } finally {
+ admin.close()
+ }
+ TestUtils.waitUntilTrue(() => cluster.brokers().get(1).metadataCache.currentImage().features().metadataVersion().equals(MetadataVersion.latest()),
+ "Timed out waiting for metadata version update.")
+ } finally {
+ cluster.close()
+ }
+ }
}
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 5270f627bf..81810c61da 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package integration.kafka.server
+package kafka.server
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTests, Type}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index dada6187be..aefb1a5348 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -181,7 +181,7 @@ class KafkaApisTest {
} else {
ApiKeys.apisForListener(listenerType).asScala.toSet
}
- val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis)
+ val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures())
new KafkaApis(
metadataSupport = metadataSupport,
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 652b8b3a0c..b571e2d0ab 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package unit.kafka.server.metadata
+package kafka.server.metadata
import java.util.Collections.{singleton, singletonList, singletonMap}
import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import kafka.log.UnifiedLog
import kafka.server.{BrokerServer, KafkaConfig}
-import kafka.server.metadata.BrokerMetadataPublisher
import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET