You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/06/13 18:23:38 UTC
[kafka] branch trunk updated: KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cc384054c6 KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
cc384054c6 is described below
commit cc384054c6e63abd011f7687523b1292062b049d
Author: David Arthur <mu...@gmail.com>
AuthorDate: Mon Jun 13 14:23:28 2022 -0400
KAFKA-13935 Fix static usages of IBP in KRaft mode (#12250)
* Set the minimum supported MetadataVersion to 3.0-IV1
* Remove MetadataVersion.UNINITIALIZED
* Relocate RPC version mapping for fetch protocols into MetadataVersion
* Replace static IBP calls with dynamic calls to MetadataCache
A side effect of removing the UNINITIALIZED metadata version is that the FeatureControlManager and FeatureImage will initialize themselves with the minimum KRaft version (3.0-IV1).
The rationale for setting the minimum version to 3.0-IV1 is so that we can avoid any cases of KRaft mode running with an old log message format (KIP-724 was introduced in 3.0-IV1). As a side-effect of increasing this minimum version, the feature level values decreased by one.
Reviewers: Jason Gustafson <ja...@confluent.io>, Jun Rao <ju...@gmail.com>
---
.../main/scala/kafka/server/BrokerFeatures.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 51 +++++-------
.../main/scala/kafka/server/KafkaRaftServer.scala | 16 ++--
.../scala/kafka/server/RemoteLeaderEndPoint.scala | 22 ++++--
.../scala/kafka/server/ReplicaFetcherManager.scala | 9 ++-
.../scala/kafka/server/ReplicaFetcherThread.scala | 8 +-
.../main/scala/kafka/server/ReplicaManager.scala | 6 +-
.../server/metadata/BrokerMetadataPublisher.scala | 6 +-
core/src/main/scala/kafka/tools/StorageTool.scala | 4 +-
core/src/test/java/kafka/test/ClusterConfig.java | 16 ++--
.../java/kafka/test/ClusterTestExtensionsTest.java | 6 ++
.../java/kafka/test/annotation/ClusterTest.java | 2 +-
.../kafka/test/junit/ClusterTestExtensions.java | 8 +-
.../test/junit/RaftClusterInvocationContext.java | 3 +-
.../test/junit/ZkClusterInvocationContext.java | 2 +-
.../java/kafka/testkit/KafkaClusterTestKit.java | 2 +-
.../transaction/ProducerIdsIntegrationTest.scala | 2 +-
.../server/MetadataVersionIntegrationTest.scala | 9 ++-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 28 +++++++
.../unit/kafka/server/KafkaRaftServerTest.scala | 55 ++++++++++++-
.../kafka/server/ReplicaFetcherThreadTest.scala | 24 +++---
.../unit/kafka/server/ReplicaManagerTest.scala | 13 +++-
.../metadata/BrokerMetadataListenerTest.scala | 2 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 6 +-
.../apache/kafka/controller/BootstrapMetadata.java | 27 ++++---
.../kafka/controller/ClusterControlManager.java | 6 +-
.../kafka/controller/FeatureControlManager.java | 24 ++++--
.../apache/kafka/controller/QuorumController.java | 17 ++--
.../apache/kafka/controller/QuorumFeatures.java | 4 +-
.../java/org/apache/kafka/image/FeaturesImage.java | 11 ++-
.../java/org/apache/kafka/image/MetadataImage.java | 5 --
.../kafka/controller/BootstrapMetadataTest.java | 16 ++--
.../controller/ClusterControlManagerTest.java | 2 +-
.../controller/FeatureControlManagerTest.java | 19 +++--
.../kafka/controller/QuorumControllerTest.java | 24 +++++-
.../kafka/controller/QuorumControllerTestEnv.java | 6 +-
.../kafka/server/common/MetadataVersion.java | 90 ++++++++++++++++++----
.../kafka/server/common/MetadataVersionTest.java | 8 +-
38 files changed, 379 insertions(+), 182 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala
index d385f1eb07..70ef7c71cb 100644
--- a/core/src/main/scala/kafka/server/BrokerFeatures.scala
+++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala
@@ -73,7 +73,7 @@ object BrokerFeatures extends Logging {
def createDefault(): BrokerFeatures = {
new BrokerFeatures(Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
- new SupportedVersionRange(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()))))
+ new SupportedVersionRange(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latest().featureLevel()))))
}
def createEmpty(): BrokerFeatures = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3d7df18cbb..b4e0b9449c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1790,38 +1790,25 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
// We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
- val interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
-
- val fetchRequestVersion: Short =
- if (interBrokerProtocolVersion.isAtLeast(IBP_3_1_IV0)) 13
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV1)) 12
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 11
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV2)) 10
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 8
- else if (interBrokerProtocolVersion.isAtLeast(IBP_1_1_IV0)) 7
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV1)) 5
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 4
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV1)) 3
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_0_IV0)) 2
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_9_0)) 1
- else 0
-
- val offsetForLeaderEpochRequestVersion: Short =
- if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 4
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 3
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 2
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV0)) 1
- else 0
-
- val listOffsetRequestVersion: Short =
- if (interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)) 7
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 6
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV1)) 5
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 4
- else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 3
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 2
- else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) 1
- else 0
+ val interBrokerProtocolVersion = if (processRoles.isEmpty) {
+ MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
+ } else {
+ if (originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
+ // A user-supplied IBP was given
+ val configuredVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString)
+ if (!configuredVersion.isKRaftSupported) {
+ throw new ConfigException(s"A non-KRaft version ${interBrokerProtocolVersionString} given for ${KafkaConfig.InterBrokerProtocolVersionProp}. " +
+ s"The minimum version is ${MetadataVersion.MINIMUM_KRAFT_VERSION}")
+ } else {
+ warn(s"${KafkaConfig.InterBrokerProtocolVersionProp} is deprecated in KRaft mode as of 3.3 and will only " +
+ s"be read when first upgrading from a KRaft prior to 3.3. See kafka-storage.sh help for details on setting " +
+ s"the metadata version for a new KRaft cluster.")
+ }
+ }
+ // In KRaft mode, we pin this value to the minimum KRaft-supported version. This prevents inadvertent usage of
+ // the static IBP config in broker components running in KRaft mode
+ MetadataVersion.MINIMUM_KRAFT_VERSION
+ }
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownMaxRetries = getInt(KafkaConfig.ControlledShutdownMaxRetriesProp)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 5a1c3087d3..f1474430b9 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics
import java.nio.file.Paths
import scala.collection.Seq
+import scala.compat.java8.FunctionConverters.asJavaSupplier
import scala.jdk.CollectionConverters._
/**
@@ -180,13 +181,16 @@ object KafkaRaftServer {
"If you intend to create a new broker, you should remove all data in your data directories (log.dirs).")
}
- // Load the bootstrap metadata file or, in the case of an upgrade from KRaft preview, bootstrap the
- // metadata.version corresponding to a user-configured IBP.
- val bootstrapMetadata = if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
- BootstrapMetadata.load(Paths.get(config.metadataLogDir), config.interBrokerProtocolVersion)
- } else {
- BootstrapMetadata.load(Paths.get(config.metadataLogDir), MetadataVersion.IBP_3_0_IV0)
+ // Load the bootstrap metadata file. In the case of an upgrade from older KRaft where there is no bootstrap metadata,
+ // read the IBP from config in order to bootstrap the equivalent metadata version.
+ def getUserDefinedIBPVersionOrThrow(): MetadataVersion = {
+ if (config.originals.containsKey(KafkaConfig.InterBrokerProtocolVersionProp)) {
+ MetadataVersion.fromVersionString(config.interBrokerProtocolVersionString)
+ } else {
+ throw new KafkaException(s"Cannot upgrade from KRaft version prior to 3.3 without first setting ${KafkaConfig.InterBrokerProtocolVersionProp} on each broker.")
+ }
}
+ val bootstrapMetadata = BootstrapMetadata.load(Paths.get(config.metadataLogDir), asJavaSupplier(() => getUserDefinedIBPVersionOrThrow()))
(metaProperties, bootstrapMetadata, offlineDirs.toSeq)
}
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index a9ac51315a..826643a0f5 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetFo
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse}
+import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2
import scala.jdk.CollectionConverters._
@@ -46,13 +47,16 @@ import scala.compat.java8.OptionConverters.RichOptionForJava8
* @param brokerConfig Broker configuration
* @param replicaManager A ReplicaManager
* @param quota The quota, used when building a fetch request
+ * @param metadataVersionSupplier A supplier that returns the current MetadataVersion. This can change during
+ * runtime in KRaft mode.
*/
class RemoteLeaderEndPoint(logPrefix: String,
blockingSender: BlockingSend,
private[server] val fetchSessionHandler: FetchSessionHandler, // visible for testing
brokerConfig: KafkaConfig,
replicaManager: ReplicaManager,
- quota: ReplicaQuota) extends LeaderEndPoint with Logging {
+ quota: ReplicaQuota,
+ metadataVersionSupplier: () => MetadataVersion) extends LeaderEndPoint with Logging {
this.logIdent = logPrefix
@@ -61,7 +65,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
- override val isTruncationOnFetchSupported = brokerConfig.interBrokerProtocolVersion.isTruncationOnFetchSupported
+ override def isTruncationOnFetchSupported = metadataVersionSupplier().isTruncationOnFetchSupported
override def initiateClose(): Unit = blockingSender.initiateClose()
@@ -106,7 +110,8 @@ class RemoteLeaderEndPoint(logPrefix: String,
.setPartitionIndex(topicPartition.partition)
.setCurrentLeaderEpoch(currentLeaderEpoch)
.setTimestamp(earliestOrLatest)))
- val requestBuilder = ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, brokerConfig.brokerId)
+ val metadataVersion = metadataVersionSupplier()
+ val requestBuilder = ListOffsetsRequest.Builder.forReplica(metadataVersion.listOffsetRequestVersion, brokerConfig.brokerId)
.setTargetTimes(Collections.singletonList(topic))
val clientResponse = blockingSender.sendRequest(requestBuilder)
@@ -116,7 +121,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
Errors.forCode(responsePartition.errorCode) match {
case Errors.NONE =>
- if (brokerConfig.interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2))
+ if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
responsePartition.offset
else
responsePartition.oldStyleOffsets.get(0)
@@ -141,7 +146,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
}
val epochRequest = OffsetsForLeaderEpochRequest.Builder.forFollower(
- brokerConfig.offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
+ metadataVersionSupplier().offsetForLeaderEpochRequestVersion, topics, brokerConfig.brokerId)
debug(s"Sending offset for leader epoch request $epochRequest")
try {
@@ -201,7 +206,12 @@ class RemoteLeaderEndPoint(logPrefix: String,
val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {
None
} else {
- val version: Short = if (brokerConfig.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) 12 else brokerConfig.fetchRequestVersion
+ val metadataVersion = metadataVersionSupplier()
+ val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) {
+ 12
+ } else {
+ metadataVersion.fetchRequestVersion
+ }
val requestBuilder = FetchRequest.Builder
.forReplica(version, brokerConfig.brokerId, maxWait, minBytes, fetchData.toSend)
.setMaxBytes(maxBytes)
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index feb082c5ae..33af5836cd 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -21,13 +21,15 @@ import kafka.cluster.BrokerEndPoint
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.server.common.MetadataVersion
class ReplicaFetcherManager(brokerConfig: KafkaConfig,
protected val replicaManager: ReplicaManager,
metrics: Metrics,
time: Time,
threadNamePrefix: Option[String] = None,
- quotaManager: ReplicationQuotaManager)
+ quotaManager: ReplicationQuotaManager,
+ metadataVersionSupplier: () => MetadataVersion)
extends AbstractFetcherManager[ReplicaFetcherThread](
name = "ReplicaFetcherManager on broker " + brokerConfig.brokerId,
clientId = "Replica",
@@ -41,9 +43,10 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
val endpoint = new BrokerBlockingSender(sourceBroker, brokerConfig, metrics, time, fetcherId,
s"broker-${brokerConfig.brokerId}-fetcher-$fetcherId", logContext)
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
- val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig, replicaManager, quotaManager)
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
+ replicaManager, quotaManager, metadataVersionSupplier)
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
- quotaManager, logContext.logPrefix)
+ quotaManager, logContext.logPrefix, metadataVersionSupplier)
}
def shutdown(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 86cc6b1b9d..2e728ce817 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -21,6 +21,7 @@ import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests._
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.server.common.MetadataVersion
class ReplicaFetcherThread(name: String,
leader: LeaderEndPoint,
@@ -28,7 +29,8 @@ class ReplicaFetcherThread(name: String,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicaQuota,
- logPrefix: String)
+ logPrefix: String,
+ metadataVersionSupplier: () => MetadataVersion)
extends AbstractFetcherThread(name = name,
clientId = name,
leader = leader,
@@ -39,7 +41,7 @@ class ReplicaFetcherThread(name: String,
this.logIdent = logPrefix
- override protected val isOffsetForLeaderEpochSupported: Boolean = brokerConfig.interBrokerProtocolVersion.isOffsetForLeaderEpochSupported
+ override protected val isOffsetForLeaderEpochSupported: Boolean = metadataVersionSupplier().isOffsetForLeaderEpochSupported
override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.localLogOrException(topicPartition).latestEpoch
@@ -135,7 +137,7 @@ class ReplicaFetcherThread(name: String,
def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
// oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
- if (brokerConfig.fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
+ if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 03983ad98d..aff10ee903 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -310,7 +310,7 @@ class ReplicaManager(val config: KafkaConfig,
// If inter-broker protocol (IBP) < 1.0, the controller will send LeaderAndIsrRequest V0 which does not include isNew field.
// In this case, the broker receiving the request cannot determine whether it is safe to create a partition if a log directory has failed.
// Thus, we choose to halt the broker on any log directory failure if IBP < 1.0
- val haltBrokerOnFailure = config.interBrokerProtocolVersion.isLessThan(IBP_1_0_IV0)
+ val haltBrokerOnFailure = metadataCache.metadataVersion().isLessThan(IBP_1_0_IV0)
logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()
}
@@ -1773,7 +1773,7 @@ class ReplicaManager(val config: KafkaConfig,
* OffsetForLeaderEpoch request.
*/
protected def initialFetchOffset(log: UnifiedLog): Long = {
- if (config.interBrokerProtocolVersion.isTruncationOnFetchSupported() && log.latestEpoch.nonEmpty)
+ if (metadataCache.metadataVersion().isTruncationOnFetchSupported && log.latestEpoch.nonEmpty)
log.logEndOffset
else
log.highWatermark
@@ -1903,7 +1903,7 @@ class ReplicaManager(val config: KafkaConfig,
}
protected def createReplicaFetcherManager(metrics: Metrics, time: Time, threadNamePrefix: Option[String], quotaManager: ReplicationQuotaManager) = {
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
+ new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager, () => metadataCache.metadataVersion())
}
protected def createReplicaAlterLogDirsManager(quotaManager: ReplicationQuotaManager, brokerTopicStats: BrokerTopicStats) = {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 60165a399e..212f188504 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -31,7 +31,6 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.common.MetadataVersion
import scala.collection.mutable
@@ -132,10 +131,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
- val metadataVersionLogMsg = newImage.features().metadataVersion() match {
- case MetadataVersion.UNINITIALIZED => "un-initialized metadata.version"
- case mv: MetadataVersion => s"metadata.version ${mv.featureLevel()}"
- }
+ val metadataVersionLogMsg = s"metadata.version ${newImage.features().metadataVersion()}"
if (_firstPublish) {
info(s"Publishing initial metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala
index 333af86314..a96275cc27 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -49,7 +49,7 @@ object StorageTool extends Logging {
val clusterId = namespace.getString("cluster_id")
val metadataVersion = getMetadataVersion(namespace)
if (!metadataVersion.isKRaftSupported) {
- throw new TerseFailure(s"Must specify a metadata version of at least 1.")
+ throw new TerseFailure(s"Must specify a valid KRaft metadata version of at least 3.0.")
}
val metaProperties = buildMetadataProperties(clusterId, config.get)
val ignoreFormatted = namespace.getBoolean("ignore_formatted")
@@ -99,7 +99,7 @@ object StorageTool extends Logging {
action(storeTrue())
formatParser.addArgument("--release-version", "-r").
action(store()).
- help(s"A release version to use for the initial metadata.version. The default is (${MetadataVersion.latest().version()})")
+ help(s"A KRaft release version to use for the initial metadata version. The minimum is 3.0, the default is ${MetadataVersion.latest().version()}")
parser.parseArgsOrFail(args)
}
diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java
index 5830959283..8e9f7de96a 100644
--- a/core/src/test/java/kafka/test/ClusterConfig.java
+++ b/core/src/test/java/kafka/test/ClusterConfig.java
@@ -122,8 +122,8 @@ public class ClusterConfig {
return Optional.ofNullable(trustStoreFile);
}
- public Optional<MetadataVersion> metadataVersion() {
- return Optional.ofNullable(metadataVersion);
+ public MetadataVersion metadataVersion() {
+ return metadataVersion;
}
public Properties brokerServerProperties(int brokerId) {
@@ -133,7 +133,7 @@ public class ClusterConfig {
public Map<String, String> nameTags() {
Map<String, String> tags = new LinkedHashMap<>(4);
name().ifPresent(name -> tags.put("Name", name));
- metadataVersion().ifPresent(mv -> tags.put("MetadataVersion", mv.toString()));
+ tags.put("MetadataVersion", metadataVersion.toString());
tags.put("Security", securityProtocol.name());
listenerName().ifPresent(listener -> tags.put("Listener", listener));
return tags;
@@ -150,11 +150,12 @@ public class ClusterConfig {
}
public static Builder defaultClusterBuilder() {
- return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT);
+ return new Builder(Type.ZK, 1, 1, true, SecurityProtocol.PLAINTEXT, MetadataVersion.latest());
}
- public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
- return new Builder(type, brokers, controllers, autoStart, securityProtocol);
+ public static Builder clusterBuilder(Type type, int brokers, int controllers, boolean autoStart,
+ SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
+ return new Builder(type, brokers, controllers, autoStart, securityProtocol, metadataVersion);
}
public static class Builder {
@@ -168,12 +169,13 @@ public class ClusterConfig {
private File trustStoreFile;
private MetadataVersion metadataVersion;
- Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) {
+ Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
this.type = type;
this.brokers = brokers;
this.controllers = controllers;
this.autoStart = autoStart;
this.securityProtocol = securityProtocol;
+ this.metadataVersion = metadataVersion;
}
public Builder type(Type type) {
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 8c18451efd..33780f795e 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -109,4 +110,9 @@ public class ClusterTestExtensionsTest {
clusterInstance.start();
Assertions.assertNotNull(clusterInstance.anyBrokerSocketServer());
}
+
+ @ClusterTest
+ public void testDefaults(ClusterConfig config) {
+ Assertions.assertEquals(MetadataVersion.IBP_3_3_IV3, config.metadataVersion());
+ }
}
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index b83df127f5..d1d3222a25 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
- MetadataVersion metadataVersion() default MetadataVersion.UNINITIALIZED;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_3_3_IV3;
ClusterConfigProperty[] serverProperties() default {};
}
diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
index f0c1d9bbda..bd69109c4b 100644
--- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
+++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
@@ -25,7 +25,6 @@ import kafka.test.annotation.ClusterTemplate;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
-import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
@@ -180,7 +179,8 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
throw new IllegalStateException();
}
- ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart, annot.securityProtocol());
+ ClusterConfig.Builder builder = ClusterConfig.clusterBuilder(type, brokers, controllers, autoStart,
+ annot.securityProtocol(), annot.metadataVersion());
if (!annot.name().isEmpty()) {
builder.name(annot.name());
} else {
@@ -195,10 +195,6 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
properties.put(property.key(), property.value());
}
- if (!annot.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
- builder.metadataVersion(annot.metadataVersion());
- }
-
ClusterConfig config = builder.build();
config.serverProperties().putAll(properties);
type.invocationContexts(config, testInvocations);
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index f0ca98a5f2..73fe67836a 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
-import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@@ -86,7 +85,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
- setBootstrapMetadataVersion(clusterConfig.metadataVersion().orElse(MetadataVersion.latest())).
+ setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index 02f21906ed..d8375b0127 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -108,7 +108,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Properties serverConfig() {
Properties props = clusterConfig.serverProperties();
- clusterConfig.metadataVersion().ifPresent(mv -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), mv.version()));
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp(), metadataVersion().version());
return props;
}
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 1924579e17..a930bafde6 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -340,7 +340,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
StorageTool.formatCommand(out,
JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
properties,
- MetadataVersion.IBP_3_0_IV0,
+ MetadataVersion.MINIMUM_KRAFT_VERSION,
false);
} finally {
for (String line : stream.toString().split(String.format("%n"))) {
diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
index 3b97ee8398..7d3203e930 100644
--- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala
@@ -46,7 +46,7 @@ class ProducerIdsIntegrationTest {
@ClusterTests(Array(
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_2_8_IV1),
new ClusterTest(clusterType = Type.ZK, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0),
- new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV0)
+ new ClusterTest(clusterType = Type.KRAFT, brokers = 3, metadataVersion = MetadataVersion.IBP_3_0_IV1)
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index b671e5d5e3..c060e3a6da 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -32,7 +32,7 @@ import scala.jdk.CollectionConverters._
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class MetadataVersionIntegrationTest {
@ClusterTests(value = Array(
- new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV0),
+ new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_0_IV1),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_1_IV0),
new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_2_IV0)
))
@@ -40,8 +40,8 @@ class MetadataVersionIntegrationTest {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
- assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
- assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().get().featureLevel())
+ assertEquals(ff.minVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
+ assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
// Update to new version
val updateVersion = MetadataVersion.IBP_3_3_IV0.featureLevel.shortValue
@@ -71,7 +71,8 @@ class MetadataVersionIntegrationTest {
val admin = clusterInstance.createAdminClient()
val describeResult = admin.describeFeatures()
val ff = describeResult.featureMetadata().get().finalizedFeatures().get(MetadataVersion.FEATURE_NAME)
- assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel())
+ assertEquals(ff.minVersionLevel(), MetadataVersion.latest().featureLevel(),
+ "If this test fails, check the default MetadataVersion in the @ClusterTest annotation")
assertEquals(ff.maxVersionLevel(), MetadataVersion.latest().featureLevel())
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1dd0a9ebc8..ee638ba893 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1570,4 +1570,32 @@ class KafkaConfigTest {
"contained in listeners or controller.listener.names",
assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
}
+
+ @Test
+ def testIgnoreUserInterBrokerProtocolVersionKRaft(): Unit = {
+ for (ibp <- Seq("3.0", "3.1", "3.2")) {
+ val props = new Properties()
+ props.putAll(kraftProps())
+ props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, ibp)
+ val config = new KafkaConfig(props)
+ assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
+ }
+ }
+
+ @Test
+ def testInvalidInterBrokerProtocolVersionKRaft(): Unit = {
+ val props = new Properties()
+ props.putAll(kraftProps())
+ props.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, "2.8")
+ assertEquals("A non-KRaft version 2.8 given for inter.broker.protocol.version. The minimum version is 3.0-IV1",
+ assertThrows(classOf[ConfigException], () => new KafkaConfig(props)).getMessage)
+ }
+
+ @Test
+ def testDefaultInterBrokerProtocolVersionKRaft(): Unit = {
+ val props = new Properties()
+ props.putAll(kraftProps())
+ val config = new KafkaConfig(props)
+ assertEquals(config.interBrokerProtocolVersion, MetadataVersion.MINIMUM_KRAFT_VERSION)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index f997455f0b..17483e58a6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -24,6 +24,7 @@ import kafka.log.UnifiedLog
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -71,12 +72,13 @@ class KafkaRaftServerTest {
private def invokeLoadMetaProperties(
metaProperties: MetaProperties,
- configProperties: Properties
+ configProperties: Properties,
+ metadataVersion: Option[MetadataVersion] = Some(MetadataVersion.latest())
): (MetaProperties, BootstrapMetadata, collection.Seq[String]) = {
val tempLogDir = TestUtils.tempDirectory()
try {
writeMetaProperties(tempLogDir, metaProperties)
-
+ metadataVersion.foreach(mv => writeBootstrapMetadata(tempLogDir, mv))
configProperties.put(KafkaConfig.LogDirProp, tempLogDir.getAbsolutePath)
val config = KafkaConfig.fromProps(configProperties)
KafkaRaftServer.initializeLogDirs(config)
@@ -94,6 +96,11 @@ class KafkaRaftServerTest {
checkpoint.write(metaProperties.toProperties)
}
+ private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = {
+ val bootstrapMetadata = BootstrapMetadata.create(metadataVersion)
+ BootstrapMetadata.write(bootstrapMetadata, logDir.toPath)
+ }
+
@Test
def testStartupFailsIfMetaPropertiesMissingInSomeLogDir(): Unit = {
val clusterId = clusterIdBase64
@@ -147,6 +154,7 @@ class KafkaRaftServerTest {
// One log dir is online and has properly formatted `meta.properties`
val validDir = TestUtils.tempDirectory()
writeMetaProperties(validDir, MetaProperties(clusterId, nodeId))
+ writeBootstrapMetadata(validDir, MetadataVersion.latest())
// Use a regular file as an invalid log dir to trigger an IO error
val invalidDir = TestUtils.tempFile("blah")
@@ -215,4 +223,47 @@ class KafkaRaftServerTest {
() => KafkaRaftServer.initializeLogDirs(config))
}
+ @Test
+ def testKRaftUpdateWithIBP(): Unit = {
+ val clusterId = clusterIdBase64
+ val nodeId = 0
+ val metaProperties = MetaProperties(clusterId, nodeId)
+
+ val configProperties = new Properties
+ configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ configProperties.put(KafkaConfig.InterBrokerProtocolVersionProp, "3.2")
+
+ val (loadedMetaProperties, bootstrapMetadata, offlineDirs) =
+ invokeLoadMetaProperties(metaProperties, configProperties, None)
+
+ assertEquals(metaProperties, loadedMetaProperties)
+ assertEquals(Seq.empty, offlineDirs)
+ assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_2_IV0)
+ }
+
+ @Test
+ def testKRaftUpdateWithoutIBP(): Unit = {
+ val clusterId = clusterIdBase64
+ val nodeId = 0
+ val metaProperties = MetaProperties(clusterId, nodeId)
+
+ val logDir = TestUtils.tempDirectory()
+ writeMetaProperties(logDir, metaProperties)
+
+ val configProperties = new Properties
+ configProperties.put(KafkaConfig.ProcessRolesProp, "broker,controller")
+ configProperties.put(KafkaConfig.NodeIdProp, nodeId.toString)
+ configProperties.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+ configProperties.put(KafkaConfig.QuorumVotersProp, s"$nodeId@localhost:9093")
+ configProperties.put(KafkaConfig.ControllerListenerNamesProp, "SSL")
+ configProperties.put(KafkaConfig.LogDirProp, logDir.getAbsolutePath)
+
+ val config = KafkaConfig.fromProps(configProperties)
+ assertEquals("Cannot upgrade from KRaft version prior to 3.3 without first setting inter.broker.protocol.version on each broker.",
+ assertThrows(classOf[KafkaException], () => KafkaRaftServer.initializeLogDirs(config)).getMessage)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index e18dd29d69..c7a222c2d5 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -103,14 +103,16 @@ class ReplicaFetcherThreadTest {
leaderEndpointBlockingSend: BlockingSend): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${brokerConfig.brokerId}, leaderId=${leaderEndpointBlockingSend.brokerEndPoint().id}, fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, leaderEndpointBlockingSend.brokerEndPoint().id)
- val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler, brokerConfig, replicaMgr, quota)
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix, leaderEndpointBlockingSend, fetchSessionHandler,
+ brokerConfig, replicaMgr, quota, () => brokerConfig.interBrokerProtocolVersion)
new ReplicaFetcherThread(name,
leader,
brokerConfig,
failedPartitions,
replicaMgr,
quota,
- logContext.logPrefix)
+ logContext.logPrefix,
+ () => brokerConfig.interBrokerProtocolVersion)
}
@Test
@@ -121,9 +123,9 @@ class ReplicaFetcherThreadTest {
val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
when(replicaManager.brokerTopicStats).thenReturn(mock(classOf[BrokerTopicStats]))
- assertEquals(ApiKeys.FETCH.latestVersion, config.fetchRequestVersion)
- assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.offsetForLeaderEpochRequestVersion)
- assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.listOffsetRequestVersion)
+ assertEquals(ApiKeys.FETCH.latestVersion, config.interBrokerProtocolVersion.fetchRequestVersion())
+ assertEquals(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, config.interBrokerProtocolVersion.offsetForLeaderEpochRequestVersion)
+ assertEquals(ApiKeys.LIST_OFFSETS.latestVersion, config.interBrokerProtocolVersion.listOffsetRequestVersion)
}
@Test
@@ -581,8 +583,10 @@ class ReplicaFetcherThreadTest {
val mockNetwork = new MockBlockingSender(Collections.emptyMap(), brokerEndPoint, new SystemTime())
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
- val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config, replicaManager, quota)
- val thread = new ReplicaFetcherThread("bob", leader, config, failedPartitions, replicaManager, quota, logContext.logPrefix) {
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockNetwork, fetchSessionHandler, config,
+ replicaManager, quota, () => config.interBrokerProtocolVersion)
+ val thread = new ReplicaFetcherThread("bob", leader, config, failedPartitions,
+ replicaManager, quota, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
override def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = None
}
thread.addPartitions(Map(t1p0 -> initialFetchState(Some(topicId1), initialLEO), t1p1 -> initialFetchState(Some(topicId1), initialLEO)))
@@ -1036,14 +1040,16 @@ class ReplicaFetcherThreadTest {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, brokerEndPoint.id)
- val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler, config, replicaManager, replicaQuota)
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix, mockBlockingSend, fetchSessionHandler, config,
+ replicaManager, replicaQuota, () => config.interBrokerProtocolVersion)
val thread = new ReplicaFetcherThread("bob",
leader,
config,
failedPartitions,
replicaManager,
replicaQuota,
- logContext.logPrefix)
+ logContext.logPrefix,
+ () => config.interBrokerProtocolVersion)
val leaderEpoch = 1
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index c2c8b4ddc7..ab0649a6af 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -214,6 +214,7 @@ class ReplicaManagerTest {
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
val metadataCache: MetadataCache = mock(classOf[MetadataCache])
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
+ when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val rm = new ReplicaManager(
metrics = metrics,
config = config,
@@ -1964,7 +1965,7 @@ class ReplicaManagerTest {
any[TopicPartition], any[ListenerName])).
thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap)
-
+ when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
@@ -2003,15 +2004,16 @@ class ReplicaManagerTest {
time: Time,
threadNamePrefix: Option[String],
replicationQuotaManager: ReplicationQuotaManager): ReplicaFetcherManager = {
- new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager) {
+ new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, replicationQuotaManager, () => metadataCache.metadataVersion()) {
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
val logContext = new LogContext(s"[ReplicaFetcher replicaId=${config.brokerId}, leaderId=${sourceBroker.id}, " +
s"fetcherId=$fetcherId] ")
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
- val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config, replicaManager, quotaManager.follower)
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, config,
+ replicaManager, quotaManager.follower, () => config.interBrokerProtocolVersion)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, config, failedPartitions, replicaManager,
- quotaManager.follower, logContext.logPrefix) {
+ quotaManager.follower, logContext.logPrefix, () => config.interBrokerProtocolVersion) {
override def doWork(): Unit = {
// In case the thread starts before the partition is added by AbstractFetcherManager,
// add it here (it's a no-op if already added)
@@ -2235,6 +2237,7 @@ class ReplicaManagerTest {
when(metadataCache.topicIdInfo()).thenReturn((topicIds.asJava, topicNames.asJava))
when(metadataCache.topicNamesToIds()).thenReturn(topicIds.asJava)
when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava)
+ when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion)
mockGetAliveBrokerFunctions(metadataCache, aliveBrokers)
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
@@ -2487,6 +2490,8 @@ class ReplicaManagerTest {
val aliveBrokers = Seq(new Node(0, "host0", 0), new Node(1, "host1", 1))
mockGetAliveBrokerFunctions(metadataCache0, aliveBrokers)
mockGetAliveBrokerFunctions(metadataCache1, aliveBrokers)
+ when(metadataCache0.metadataVersion()).thenReturn(config0.interBrokerProtocolVersion)
+ when(metadataCache1.metadataVersion()).thenReturn(config1.interBrokerProtocolVersion)
// each replica manager is for a broker
val rm0 = new ReplicaManager(
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 948e051337..f75823a029 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -74,7 +74,7 @@ class BrokerMetadataListenerTest {
)
)
val imageRecords = listener.getImageRecords().get()
- assertEquals(0, imageRecords.size())
+ assertEquals(1, imageRecords.size())
assertEquals(100L, listener.highestMetadataOffset)
assertEquals(0L, metrics.lastAppliedRecordOffset.get)
assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 86400ba1e9..b0f36522f3 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -325,7 +325,8 @@ public class ReplicaFetcherThreadBenchmark {
new LogContext(String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)), 3),
config,
replicaManager,
- replicaQuota
+ replicaQuota,
+ config::interBrokerProtocolVersion
) {
@Override
public long fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
@@ -356,7 +357,8 @@ public class ReplicaFetcherThreadBenchmark {
new FailedPartitions(),
replicaManager,
replicaQuota,
- String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)
+ String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
+ config::interBrokerProtocolVersion
);
pool = partitions;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
index fa031c525f..d9d0651a19 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BootstrapMetadata.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -128,7 +129,9 @@ public class BootstrapMetadata {
public static BootstrapMetadata create(MetadataVersion metadataVersion, List<ApiMessageAndVersion> records) {
if (!metadataVersion.isKRaftSupported()) {
- throw new IllegalArgumentException("Cannot create BootstrapMetadata with a non-KRaft metadata version.");
+ throw new IllegalArgumentException(String.format(
+ "Cannot create BootstrapMetadata with a non-KRaft metadata version %s. Minimum version is %s",
+ metadataVersion, MetadataVersion.MINIMUM_KRAFT_VERSION));
}
records.add(new ApiMessageAndVersion(
new FeatureLevelRecord()
@@ -142,18 +145,24 @@ public class BootstrapMetadata {
/**
* Load a bootstrap snapshot into a read-only bootstrap metadata object and return it.
*
- * @param bootstrapDir The directory from which to read the snapshot file.
- * @param fallbackPreviewVersion The metadata.version to boostrap if upgrading from KRaft preview
- * @return The read-only bootstrap metadata
+ * @param bootstrapDir The directory from which to read the snapshot file.
+ * @param fallbackVersionSupplier A function that returns the metadata.version to use when upgrading from an older KRaft
+ * @return The read-only bootstrap metadata
* @throws Exception
*/
- public static BootstrapMetadata load(Path bootstrapDir, MetadataVersion fallbackPreviewVersion) throws Exception {
+ public static BootstrapMetadata load(Path bootstrapDir, Supplier<MetadataVersion> fallbackVersionSupplier) throws Exception {
final Path bootstrapPath = bootstrapDir.resolve(BOOTSTRAP_FILE);
if (!Files.exists(bootstrapPath)) {
- log.debug("Missing bootstrap file, this appears to be a KRaft preview cluster. Setting metadata.version to {}.",
- fallbackPreviewVersion.featureLevel());
- return BootstrapMetadata.create(fallbackPreviewVersion);
+ // Upgrade scenario from KRaft prior to 3.3 (i.e., no bootstrap metadata present)
+ MetadataVersion fallbackVersion = fallbackVersionSupplier.get();
+ if (fallbackVersion.isKRaftSupported()) {
+ log.debug("Missing bootstrap file, this appears to be a KRaft cluster older than 3.3. Setting metadata.version to {}.",
+ fallbackVersion.featureLevel());
+ return BootstrapMetadata.create(fallbackVersion);
+ } else {
+ throw new Exception(String.format("Could not set fallback bootstrap metadata with non-KRaft metadata version of %s", fallbackVersion));
+ }
}
BootstrapListener listener = new BootstrapListener();
@@ -182,7 +191,7 @@ public class BootstrapMetadata {
if (metadataVersionRecord.isPresent()) {
return new BootstrapMetadata(MetadataVersion.fromFeatureLevel(metadataVersionRecord.get().featureLevel()), listener.records);
} else {
- throw new RuntimeException("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
+ throw new Exception("Expected a metadata.version to exist in the snapshot " + bootstrapPath + ", but none was found");
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 21bd01c3b9..859dedda22 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -590,11 +590,7 @@ public class ClusterControlManager {
ClusterControlIterator(long epoch) {
this.iterator = brokerRegistrations.entrySet(epoch).iterator();
- if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
- this.metadataVersion = MetadataVersion.IBP_3_0_IV1;
- } else {
- this.metadataVersion = featureControl.metadataVersion();
- }
+ this.metadataVersion = featureControl.metadataVersion();
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index 19127afa72..c092abcdcc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -52,7 +52,7 @@ public class FeatureControlManager {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private QuorumFeatures quorumFeatures = null;
- private MetadataVersion metadataVersion = MetadataVersion.UNINITIALIZED;
+ private MetadataVersion metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -105,6 +105,10 @@ public class FeatureControlManager {
*/
private final TimelineObject<MetadataVersion> metadataVersion;
+ /**
+ * A boolean to see if we have encountered a metadata.version or not.
+ */
+ private final TimelineObject<Boolean> sawMetadataVersion;
private FeatureControlManager(
LogContext logContext,
@@ -116,6 +120,7 @@ public class FeatureControlManager {
this.quorumFeatures = quorumFeatures;
this.finalizedVersions = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataVersion = new TimelineObject<>(snapshotRegistry, metadataVersion);
+ this.sawMetadataVersion = new TimelineObject<>(snapshotRegistry, false);
}
ControllerResult<Map<String, ApiError>> updateFeatures(
@@ -226,7 +231,7 @@ public class FeatureControlManager {
return invalidMetadataVersion(newVersionLevel, "Unknown metadata.version.");
}
- if (!currentVersion.equals(MetadataVersion.UNINITIALIZED) && newVersion.isLessThan(currentVersion)) {
+ if (newVersion.isLessThan(currentVersion)) {
// This is a downgrade
boolean metadataChanged = MetadataVersion.checkIfMetadataChanged(currentVersion, newVersion);
if (!metadataChanged) {
@@ -257,15 +262,20 @@ public class FeatureControlManager {
FinalizedControllerFeatures finalizedFeatures(long epoch) {
Map<String, Short> features = new HashMap<>();
- if (!metadataVersion.get(epoch).equals(MetadataVersion.UNINITIALIZED)) {
- features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
- }
+ features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get(epoch).featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet(epoch)) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, epoch);
}
+ /**
+ * @return true if a FeatureLevelRecord for "metadata.version" has been replayed. False otherwise
+ */
+ boolean sawMetadataVersion() {
+ return this.sawMetadataVersion.get();
+ }
+
public void replay(FeatureLevelRecord record) {
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
@@ -275,6 +285,7 @@ public class FeatureControlManager {
if (record.name().equals(MetadataVersion.FEATURE_NAME)) {
log.info("Setting metadata.version to {}", record.featureLevel());
metadataVersion.set(MetadataVersion.fromFeatureLevel(record.featureLevel()));
+ sawMetadataVersion.set(true);
} else {
if (record.featureLevel() == 0) {
log.info("Removing feature {}", record.name());
@@ -294,9 +305,6 @@ public class FeatureControlManager {
FeatureControlIterator(long epoch) {
this.iterator = finalizedVersions.entrySet(epoch).iterator();
this.metadataVersion = FeatureControlManager.this.metadataVersion.get(epoch);
- if (this.metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
- this.wroteVersion = true;
- }
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 4a8c8c9661..88146766e3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -283,7 +283,7 @@ public final class QuorumController implements Controller {
public QuorumController build() throws Exception {
if (raftClient == null) {
throw new IllegalStateException("You must set a raft client.");
- } else if (bootstrapMetadata == null || bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+ } else if (bootstrapMetadata == null) {
throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
} else if (quorumFeatures == null) {
throw new IllegalStateException("You must specify the quorum features");
@@ -932,21 +932,21 @@ public final class QuorumController implements Controller {
// write any other records to the log since we need the metadata.version to determine the correct
// record version
final MetadataVersion metadataVersion;
- if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+ if (!featureControl.sawMetadataVersion()) {
final CompletableFuture<Map<String, ApiError>> future;
if (!bootstrapMetadata.metadataVersion().isKRaftSupported()) {
- metadataVersion = MetadataVersion.UNINITIALIZED;
+ metadataVersion = MetadataVersion.MINIMUM_KRAFT_VERSION;
future = new CompletableFuture<>();
future.completeExceptionally(
- new IllegalStateException("Cannot become leader without an initial metadata.version of " +
- "at least 1. Got " + bootstrapMetadata.metadataVersion().featureLevel()));
+ new IllegalStateException("Cannot become leader without a KRaft supported version. " +
+ "Got " + bootstrapMetadata.metadataVersion()));
} else {
metadataVersion = bootstrapMetadata.metadataVersion();
future = appendWriteEvent("bootstrapMetadata", OptionalLong.empty(), () -> {
if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
log.info("Initializing metadata.version to {}", metadataVersion.featureLevel());
} else {
- log.info("Upgrading from KRaft preview. Initializing metadata.version to {}",
+ log.info("Upgrading KRaft cluster and initializing metadata.version to {}",
metadataVersion.featureLevel());
}
return ControllerResult.atomicOf(bootstrapMetadata.records(), null);
@@ -1964,6 +1964,11 @@ public final class QuorumController implements Controller {
return curClaimEpoch;
}
+ // Visible for testing
+ MetadataVersion metadataVersion() {
+ return featureControl.metadataVersion();
+ }
+
@Override
public void close() throws InterruptedException {
queue.close();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
index 9b723515bd..36725c2518 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java
@@ -72,7 +72,9 @@ public class QuorumFeatures {
public static Map<String, VersionRange> defaultFeatureMap() {
Map<String, VersionRange> features = new HashMap<>(1);
- features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
+ features.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
+ MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
+ MetadataVersion.latest().featureLevel()));
return features;
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
index 3843b59bc7..4cfb1260f1 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java
@@ -38,7 +38,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.FEATURE_LEVEL_
* This class is thread-safe.
*/
public final class FeaturesImage {
- public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.UNINITIALIZED);
+ public static final FeaturesImage EMPTY = new FeaturesImage(Collections.emptyMap(), MetadataVersion.MINIMUM_KRAFT_VERSION);
private final Map<String, Short> finalizedVersions;
@@ -68,11 +68,10 @@ public final class FeaturesImage {
public void write(Consumer<List<ApiMessageAndVersion>> out) {
List<ApiMessageAndVersion> batch = new ArrayList<>();
// Write out the metadata.version record first, and then the rest of the finalized features
- if (!metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
- batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
- }
+ batch.add(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
+
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
if (entry.getKey().equals(MetadataVersion.FEATURE_NAME)) {
continue;
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
index e3cd94a0cb..55d572127e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java
@@ -121,12 +121,7 @@ public final class MetadataImage {
}
public void write(Consumer<List<ApiMessageAndVersion>> out) {
- // We use the minimum KRaft metadata version if this image does
- // not have a specific version set.
MetadataVersion metadataVersion = features.metadataVersion();
- if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) {
- metadataVersion = MetadataVersion.IBP_3_0_IV1;
- }
// Features should be written out first so we can include the metadata.version at the beginning of the
// snapshot
features.write(out);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
index 16e03a0501..4eb5d47478 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BootstrapMetadataTest.java
@@ -34,28 +34,28 @@ public class BootstrapMetadataTest {
@Test
public void testWriteAndReadBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
- BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0);
+ BootstrapMetadata metadata = BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION);
BootstrapMetadata.write(metadata, tmpDir);
assertTrue(Files.exists(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE)));
- BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
+ BootstrapMetadata newMetadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
assertEquals(metadata, newMetadata);
}
@Test
public void testNoBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
- BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0);
- assertEquals(MetadataVersion.IBP_3_0_IV0, metadata.metadataVersion());
- metadata = BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_2_IV0);
+ BootstrapMetadata metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION);
+ assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, metadata.metadataVersion());
+ metadata = BootstrapMetadata.load(tmpDir, () -> MetadataVersion.IBP_3_2_IV0);
assertEquals(MetadataVersion.IBP_3_2_IV0, metadata.metadataVersion());
}
@Test
public void testExistingBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
- BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_0_IV0), tmpDir);
+ BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.MINIMUM_KRAFT_VERSION), tmpDir);
assertThrows(IOException.class, () -> {
BootstrapMetadata.write(BootstrapMetadata.create(MetadataVersion.IBP_3_1_IV0), tmpDir);
});
@@ -65,7 +65,7 @@ public class BootstrapMetadataTest {
public void testEmptyBootstrapFile() throws Exception {
Path tmpDir = Files.createTempDirectory("BootstrapMetadataTest");
Files.createFile(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE));
- assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
+ assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
"Should fail to load if no metadata.version is set");
}
@@ -77,7 +77,7 @@ public class BootstrapMetadataTest {
byte[] data = new byte[100];
random.nextBytes(data);
Files.write(tmpDir.resolve(BootstrapMetadata.BOOTSTRAP_FILE), data, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
- assertThrows(RuntimeException.class, () -> BootstrapMetadata.load(tmpDir, MetadataVersion.IBP_3_0_IV0),
+ assertThrows(Exception.class, () -> BootstrapMetadata.load(tmpDir, () -> MetadataVersion.MINIMUM_KRAFT_VERSION),
"Should fail on invalid data");
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index cd44e2e678..c0b163180e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -68,7 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
public class ClusterControlManagerTest {
@ParameterizedTest
- @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV0", "IBP_3_3_IV2"})
+ @EnumSource(value = MetadataVersion.class, names = {"IBP_3_0_IV1", "IBP_3_3_IV2"})
public void testReplay(MetadataVersion metadataVersion) {
MockTime time = new MockTime(0, 0, 0);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index fd1e5af39e..4d4c471994 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -93,7 +93,7 @@ public class FeatureControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
build();
snapshotRegistry.getOrCreateSnapshot(-1);
- assertEquals(new FinalizedControllerFeatures(Collections.emptyMap(), -1),
+ assertEquals(new FinalizedControllerFeatures(Collections.singletonMap("metadata.version", (short) 1), -1),
manager.finalizedFeatures(-1));
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
@@ -131,7 +131,7 @@ public class FeatureControlManagerTest {
build();
manager.replay(record);
snapshotRegistry.getOrCreateSnapshot(123);
- assertEquals(new FinalizedControllerFeatures(versionMap("foo", 2), 123),
+ assertEquals(new FinalizedControllerFeatures(versionMap("metadata.version", 1, "foo", 2), 123),
manager.finalizedFeatures(123));
}
@@ -210,6 +210,9 @@ public class FeatureControlManagerTest {
Collections.emptyMap(), Collections.emptyMap(), false);
RecordTestUtils.replayAll(manager, result.records());
RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
+ Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
+ setName("metadata.version").
+ setFeatureLevel((short) 1), (short) 0)),
Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").
setFeatureLevel((short) 5), (short) 0)),
@@ -222,13 +225,13 @@ public class FeatureControlManagerTest {
@Test
public void testApplyMetadataVersionChangeRecord() {
QuorumFeatures features = features(MetadataVersion.FEATURE_NAME,
- MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
+ MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.IBP_3_3_IV0.featureLevel());
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(features).build();
manager.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(MetadataVersion.IBP_3_0_IV0.featureLevel()));
- assertEquals(MetadataVersion.IBP_3_0_IV0, manager.metadataVersion());
+ setFeatureLevel(MetadataVersion.IBP_3_0_IV1.featureLevel()));
+ assertEquals(MetadataVersion.IBP_3_0_IV1, manager.metadataVersion());
}
@Test
@@ -258,12 +261,12 @@ public class FeatureControlManagerTest {
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
result = manager.updateFeatures(
- Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_0_IV0.featureLevel()),
+ Collections.singletonMap(MetadataVersion.FEATURE_NAME, MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel()),
Collections.singletonMap(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE),
Collections.emptyMap(),
true);
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().get(MetadataVersion.FEATURE_NAME).error());
- assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 4-5",
+ assertEquals("Invalid update version 1 for feature metadata.version. Local controller 0 only supports versions 3-4",
result.response().get(MetadataVersion.FEATURE_NAME).message());
}
@@ -271,7 +274,7 @@ public class FeatureControlManagerTest {
public void testCreateFeatureLevelRecords() {
Map<String, VersionRange> localSupportedFeatures = new HashMap<>();
localSupportedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
- MetadataVersion.IBP_3_0_IV0.featureLevel(), MetadataVersion.latest().featureLevel()));
+ MetadataVersion.IBP_3_0_IV1.featureLevel(), MetadataVersion.latest().featureLevel()));
localSupportedFeatures.put("foo", VersionRange.of(0, 2));
FeatureControlManager manager = new FeatureControlManager.Builder().
setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), localSupportedFeatures, emptyList())).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 87afbf4199..1729d95312 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -96,6 +96,7 @@ import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
import static java.util.function.Function.identity;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -214,7 +215,7 @@ public class QuorumControllerTest {
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
- }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), MetadataVersion.latest());
+ }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -306,7 +307,7 @@ public class QuorumControllerTest {
LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
b.setConfigSchema(SCHEMA);
- }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), MetadataVersion.latest());
+ }, OptionalLong.of(sessionTimeoutMillis), OptionalLong.of(leaderImbalanceCheckIntervalNs), BootstrapMetadata.create(MetadataVersion.latest()));
) {
ListenerCollection listeners = new ListenerCollection();
listeners.add(new Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
@@ -537,7 +538,7 @@ public class QuorumControllerTest {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
- .setMinSupportedVersion(MetadataVersion.IBP_3_0_IV0.featureLevel())
+ .setMinSupportedVersion(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel())
.setMaxSupportedVersion(MetadataVersion.latest().featureLevel()));
return features;
}
@@ -1179,4 +1180,21 @@ public class QuorumControllerTest {
"authorizer " + i + " should not have completed loading.");
}
}
+
+ @Test
+ public void testInvalidBootstrapMetadata() throws Exception {
+ // We can't actually create a BootstrapMetadata with an invalid version, so we have to fake it
+ BootstrapMetadata bootstrapMetadata = Mockito.mock(BootstrapMetadata.class);
+ Mockito.when(bootstrapMetadata.metadataVersion()).thenReturn(MetadataVersion.IBP_2_8_IV0);
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, Optional.empty());
+ QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+ b.setConfigSchema(SCHEMA);
+ }, OptionalLong.empty(), OptionalLong.empty(), bootstrapMetadata);
+ ) {
+ QuorumController active = controlEnv.activeController();
+ TestUtils.waitForCondition(() -> !active.isActive(),
+ "Timed out waiting for controller to renounce itself after bad bootstrap metadata version.");
+ }
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index e1481b9fe5..68b11ae3ec 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -49,7 +49,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
LocalLogManagerTestEnv logEnv,
Consumer<QuorumController.Builder> builderConsumer
) throws Exception {
- this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), MetadataVersion.latest());
+ this(logEnv, builderConsumer, OptionalLong.empty(), OptionalLong.empty(), BootstrapMetadata.create(MetadataVersion.latest()));
}
public QuorumControllerTestEnv(
@@ -57,7 +57,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
Consumer<Builder> builderConsumer,
OptionalLong sessionTimeoutMillis,
OptionalLong leaderImbalanceCheckIntervalNs,
- MetadataVersion metadataVersion
+ BootstrapMetadata bootstrapMetadata
) throws Exception {
this.logEnv = logEnv;
int numControllers = logEnv.logManagers().size();
@@ -68,7 +68,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
for (int i = 0; i < numControllers; i++) {
QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(i));
- builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
+ builder.setBootstrapMetadata(bootstrapMetadata);
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
sessionTimeoutMillis.ifPresent(timeout -> {
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index aa1a416c29..55916470cb 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -43,7 +43,6 @@ import org.apache.kafka.common.record.RecordVersion;
* released version, they can use "0.10.0" when upgrading to the 0.10.0 release.
*/
public enum MetadataVersion {
- UNINITIALIZED(-1, "0.0", ""),
IBP_0_8_0(-1, "0.8.0", ""),
IBP_0_8_1(-1, "0.8.1", ""),
@@ -140,32 +139,36 @@ public enum MetadataVersion {
IBP_2_8_IV1(-1, "2.8", "IV1"),
// Introduce AllocateProducerIds (KIP-730)
- IBP_3_0_IV0(1, "3.0", "IV0", true),
+ IBP_3_0_IV0(-1, "3.0", "IV0"),
// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)
// Assume message format version is 3.0 (KIP-724)
- IBP_3_0_IV1(2, "3.0", "IV1", false),
+ IBP_3_0_IV1(1, "3.0", "IV1", true),
// Adds topic IDs to Fetch requests/responses (KIP-516)
- IBP_3_1_IV0(3, "3.1", "IV0", false),
+ IBP_3_1_IV0(2, "3.1", "IV0", false),
// Support for leader recovery for unclean leader election (KIP-704)
- IBP_3_2_IV0(4, "3.2", "IV0", true),
+ IBP_3_2_IV0(3, "3.2", "IV0", true),
// Support for metadata.version feature flag and Removes min_version_level from the finalized version range that is written to ZooKeeper (KIP-778)
- IBP_3_3_IV0(5, "3.3", "IV0", false),
+ IBP_3_3_IV0(4, "3.3", "IV0", false),
// Support NoopRecord for the cluster metadata log (KIP-835)
- IBP_3_3_IV1(6, "3.3", "IV1", true),
+ IBP_3_3_IV1(5, "3.3", "IV1", true),
// In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
- IBP_3_3_IV2(7, "3.3", "IV2", true),
+ IBP_3_3_IV2(6, "3.3", "IV2", true),
// Adds InControlledShutdown state to RegisterBrokerRecord and BrokerRegistrationChangeRecord (KIP-841).
- IBP_3_3_IV3(8, "3.3", "IV3", true);
+ IBP_3_3_IV3(7, "3.3", "IV3", true);
+ // NOTE: update the default version in @ClusterTest annotation to point to the latest version
+
public static final String FEATURE_NAME = "metadata.version";
+ public static final MetadataVersion MINIMUM_KRAFT_VERSION = IBP_3_0_IV1;
+
public static final MetadataVersion[] VERSIONS;
private final short featureLevel;
@@ -258,12 +261,73 @@ public enum MetadataVersion {
}
}
+ public short fetchRequestVersion() {
+ if (this.isAtLeast(IBP_3_1_IV0)) {
+ return 13;
+ } else if (this.isAtLeast(IBP_2_7_IV1)) {
+ return 12;
+ } else if (this.isAtLeast(IBP_2_3_IV1)) {
+ return 11;
+ } else if (this.isAtLeast(IBP_2_1_IV2)) {
+ return 10;
+ } else if (this.isAtLeast(IBP_2_0_IV1)) {
+ return 8;
+ } else if (this.isAtLeast(IBP_1_1_IV0)) {
+ return 7;
+ } else if (this.isAtLeast(IBP_0_11_0_IV1)) {
+ return 5;
+ } else if (this.isAtLeast(IBP_0_11_0_IV0)) {
+ return 4;
+ } else if (this.isAtLeast(IBP_0_10_1_IV1)) {
+ return 3;
+ } else if (this.isAtLeast(IBP_0_10_0_IV0)) {
+ return 2;
+ } else if (this.isAtLeast(IBP_0_9_0)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ public short offsetForLeaderEpochRequestVersion() {
+ if (this.isAtLeast(IBP_2_8_IV0)) {
+ return 4;
+ } else if (this.isAtLeast(IBP_2_3_IV1)) {
+ return 3;
+ } else if (this.isAtLeast(IBP_2_1_IV1)) {
+ return 2;
+ } else if (this.isAtLeast(IBP_2_0_IV0)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ public short listOffsetRequestVersion() {
+ if (this.isAtLeast(IBP_3_0_IV1)) {
+ return 7;
+ } else if (this.isAtLeast(IBP_2_8_IV0)) {
+ return 6;
+ } else if (this.isAtLeast(IBP_2_2_IV1)) {
+ return 5;
+ } else if (this.isAtLeast(IBP_2_1_IV1)) {
+ return 4;
+ } else if (this.isAtLeast(IBP_2_0_IV1)) {
+ return 3;
+ } else if (this.isAtLeast(IBP_0_11_0_IV0)) {
+ return 2;
+ } else if (this.isAtLeast(IBP_0_10_1_IV2)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
- // Make a copy of values() and omit UNINITIALIZED
MetadataVersion[] enumValues = MetadataVersion.values();
- VERSIONS = Arrays.copyOfRange(enumValues, 1, enumValues.length);
+ VERSIONS = Arrays.copyOf(enumValues, enumValues.length);
IBP_VERSIONS = new HashMap<>();
Map<String, MetadataVersion> maxInterVersion = new HashMap<>();
@@ -289,8 +353,8 @@ public enum MetadataVersion {
Optional<MetadataVersion> previous() {
int idx = this.ordinal();
- if (idx > 1) {
- return Optional.of(VERSIONS[idx - 2]);
+ if (idx > 0) {
+ return Optional.of(VERSIONS[idx - 1]);
} else {
return Optional.empty();
}
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 2f96d5fd04..99f9cc3515 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -76,7 +76,7 @@ class MetadataVersionTest {
@Test
public void testFeatureLevel() {
MetadataVersion[] metadataVersions = MetadataVersion.VERSIONS;
- int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(IBP_3_0_IV0);
+ int firstFeatureLevelIndex = Arrays.asList(metadataVersions).indexOf(MetadataVersion.MINIMUM_KRAFT_VERSION);
for (int i = 0; i < firstFeatureLevelIndex; i++) {
assertTrue(metadataVersions[i].featureLevel() < 0);
}
@@ -287,7 +287,7 @@ class MetadataVersionTest {
public void testPrevious() {
for (int i = 1; i < MetadataVersion.VERSIONS.length - 2; i++) {
MetadataVersion version = MetadataVersion.VERSIONS[i];
- assertTrue(version.previous().isPresent());
+ assertTrue(version.previous().isPresent(), version.toString());
assertEquals(MetadataVersion.VERSIONS[i - 1], version.previous().get());
}
}
@@ -317,8 +317,8 @@ class MetadataVersionTest {
}
for (MetadataVersion metadataVersion : MetadataVersion.VERSIONS) {
- if (metadataVersion.isAtLeast(IBP_3_0_IV0)) {
- assertTrue(metadataVersion.isKRaftSupported());
+ if (metadataVersion.isAtLeast(IBP_3_0_IV1)) {
+ assertTrue(metadataVersion.isKRaftSupported(), metadataVersion.toString());
} else {
assertFalse(metadataVersion.isKRaftSupported());
}