You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/11 05:06:26 UTC
[kafka] branch trunk updated: MINOR: Rename RecordFormat to
RecordVersion (#4809)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c3921d4 MINOR: Rename RecordFormat to RecordVersion (#4809)
c3921d4 is described below
commit c3921d489f4da80aad6f387158c33ec2e4bca52d
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu May 10 22:06:18 2018 -0700
MINOR: Rename RecordFormat to RecordVersion (#4809)
Also include a few clean-ups:
* Method/variable/parameter renames to make them consistent with
the class name
* Return `ApiVersion` from `minSupportedFor`
* Use `values` to remove some code duplication
* Reduce duplication in `ApiVersion` by introducing the `shortVersion`
method and building the versions map programatically
* Avoid unnecessary `regex` in `ApiVersion.apply`
* Added scaladoc to a few methods
Some of these were originally discussed in:
https://github.com/apache/kafka/pull/4583#pullrequestreview-98089400
Added a test for `ApiVersion.shortVersion`. Relying on existing tests
for the rest since there is no change in behaviour.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../{RecordFormat.java => RecordVersion.java} | 26 ++-
core/src/main/scala/kafka/api/ApiVersion.scala | 218 ++++++++++++---------
core/src/main/scala/kafka/log/Log.scala | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 9 +-
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../test/scala/unit/kafka/api/ApiVersionTest.scala | 26 ++-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 2 +-
8 files changed, 163 insertions(+), 126 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
similarity index 55%
rename from clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
rename to clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
index e71ec59..1f80d62 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
@@ -16,25 +16,31 @@
*/
package org.apache.kafka.common.record;
-public enum RecordFormat {
+/**
+ * Defines the record format versions supported by Kafka.
+ *
+ * For historical reasons, the record format version is also known as `magic` and `message format version`. Note that
+ * the version actually applies to the {@link RecordBatch} (instead of the {@link Record}). Finally, the
+ * `message.format.version` topic config confusingly expects an ApiVersion instead of a RecordVersion.
+ */
+public enum RecordVersion {
V0(0), V1(1), V2(2);
+ private static final RecordVersion[] VALUES = values();
+
public final byte value;
- RecordFormat(int value) {
+ RecordVersion(int value) {
this.value = (byte) value;
}
- public static RecordFormat lookup(byte version) {
- switch (version) {
- case 0: return V0;
- case 1: return V1;
- case 2: return V2;
- default: throw new IllegalArgumentException("Unknown format version: " + version);
- }
+ public static RecordVersion lookup(byte value) {
+ if (value < 0 || value >= VALUES.length)
+ throw new IllegalArgumentException("Unknown record version: " + value);
+ return VALUES[value];
}
- public static RecordFormat current() {
+ public static RecordVersion current() {
return V2;
}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index ff011b2..b13d237 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
package kafka.api
-import org.apache.kafka.common.record.RecordFormat
+import org.apache.kafka.common.record.RecordVersion
/**
* This class contains the different Kafka versions.
@@ -42,75 +42,73 @@ object ApiVersion {
// This implicit is necessary due to: https://issues.scala-lang.org/browse/SI-8541
implicit def orderingByVersion[A <: ApiVersion]: Ordering[A] = Ordering.by(_.id)
- private val versionNameMap = Map(
- "0.8.0" -> KAFKA_0_8_0,
- "0.8.1" -> KAFKA_0_8_1,
- "0.8.2" -> KAFKA_0_8_2,
- "0.9.0" -> KAFKA_0_9_0,
+ val allVersions: Seq[ApiVersion] = Seq(
+ KAFKA_0_8_0,
+ KAFKA_0_8_1,
+ KAFKA_0_8_2,
+ KAFKA_0_9_0,
// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
- "0.10.0-IV0" -> KAFKA_0_10_0_IV0,
+ KAFKA_0_10_0_IV0,
// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake).
- "0.10.0-IV1" -> KAFKA_0_10_0_IV1,
- "0.10.0" -> KAFKA_0_10_0_IV1,
-
+ KAFKA_0_10_0_IV1,
// introduced for JoinGroup protocol change in KIP-62
- "0.10.1-IV0" -> KAFKA_0_10_1_IV0,
+ KAFKA_0_10_1_IV0,
// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
- "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
+ KAFKA_0_10_1_IV1,
// introduced ListOffsetRequest v1 in KIP-79
- "0.10.1-IV2" -> KAFKA_0_10_1_IV2,
- "0.10.1" -> KAFKA_0_10_1_IV2,
+ KAFKA_0_10_1_IV2,
// introduced UpdateMetadataRequest v3 in KIP-103
- "0.10.2-IV0" -> KAFKA_0_10_2_IV0,
- "0.10.2" -> KAFKA_0_10_2_IV0,
+ KAFKA_0_10_2_IV0,
// KIP-98 (idempotent and transactional producer support)
- "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
+ KAFKA_0_11_0_IV0,
// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
- "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
+ KAFKA_0_11_0_IV1,
// Introduced leader epoch fetches to the replica fetcher via KIP-101
- "0.11.0-IV2" -> KAFKA_0_11_0_IV2,
- "0.11.0" -> KAFKA_0_11_0_IV2,
+ KAFKA_0_11_0_IV2,
// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
- "1.0-IV0" -> KAFKA_1_0_IV0,
- "1.0" -> KAFKA_1_0_IV0,
+ KAFKA_1_0_IV0,
// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests,
// and KafkaStorageException for fetch requests.
- "1.1-IV0" -> KAFKA_1_1_IV0,
- "1.1" -> KAFKA_1_1_IV0,
+ KAFKA_1_1_IV0,
// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
- "2.0-IV0" -> KAFKA_2_0_IV0,
- "2.0" -> KAFKA_2_0_IV0
+ KAFKA_2_0_IV0
)
- private val versionPattern = "\\.".r
-
- def apply(version: String): ApiVersion = {
- val versionsSeq = versionPattern.split(version)
- val numSegments = if (version.startsWith("0.")) 3 else 2
- val key = versionsSeq.take(numSegments).mkString(".")
- versionNameMap.getOrElse(key, throw new IllegalArgumentException(s"Version `$version` is not a valid version"))
+ // Map keys are the union of the short and full versions
+ private val versionMap: Map[String, ApiVersion] =
+ allVersions.map(v => v.version -> v).toMap ++ allVersions.groupBy(_.shortVersion).map { case (k, v) => k -> v.last }
+
+ /**
+ * Return an `ApiVersion` instance for `versionString`, which can be in a variety of formats (e.g. "0.8.0", "0.8.0.x",
+ * "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `ApiVersion`.
+ */
+ def apply(versionString: String): ApiVersion = {
+ val versionSegments = versionString.split('.').toSeq
+ val numSegments = if (versionString.startsWith("0.")) 3 else 2
+ val key = versionSegments.take(numSegments).mkString(".")
+ versionMap.getOrElse(key, throw new IllegalArgumentException(s"Version `$versionString` is not a valid version"))
}
- def latestVersion = versionNameMap.values.max
-
- def allVersions: Set[ApiVersion] = {
- versionNameMap.values.toSet
- }
-
- def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = {
- messageFormatVersion match {
- case RecordFormat.V0 => "0.8.0"
- case RecordFormat.V1 => "0.10.0"
- case RecordFormat.V2 => "0.11.0"
- case _ => throw new IllegalArgumentException(s"Invalid message format version $messageFormatVersion")
+ def latestVersion: ApiVersion = allVersions.last
+
+ /**
+ * Return the minimum `ApiVersion` that supports `RecordVersion`.
+ */
+ def minSupportedFor(recordVersion: RecordVersion): ApiVersion = {
+ recordVersion match {
+ case RecordVersion.V0 => KAFKA_0_8_0
+ case RecordVersion.V1 => KAFKA_0_10_0_IV0
+ case RecordVersion.V2 => KAFKA_0_11_0_IV0
+ case _ => throw new IllegalArgumentException(s"Invalid message format version $recordVersion")
}
}
}
sealed trait ApiVersion extends Ordered[ApiVersion] {
- val version: String
- val messageFormatVersion: RecordFormat
- val id: Int
+ def version: String
+ def shortVersion: String
+ def recordVersion: RecordVersion
+ def id: Int
override def compare(that: ApiVersion): Int =
ApiVersion.orderingByVersion.compare(this, that)
@@ -118,99 +116,127 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
override def toString: String = version
}
+/**
+ * For versions before 0.10.0, `version` and `shortVersion` were the same.
+ */
+sealed trait LegacyApiVersion extends ApiVersion {
+ def version = shortVersion
+}
+
+/**
+ * From 0.10.0 onwards, each version has a sub-version. For example, IV0 is the sub-version of 0.10.0-IV0.
+ */
+sealed trait DefaultApiVersion extends ApiVersion {
+ lazy val version = shortVersion + "-" + subVersion
+ protected def subVersion: String
+}
+
// Keep the IDs in order of versions
-case object KAFKA_0_8_0 extends ApiVersion {
- val version: String = "0.8.0.X"
- val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_8_0 extends LegacyApiVersion {
+ val shortVersion = "0.8.0"
+ val recordVersion = RecordVersion.V0
val id: Int = 0
}
-case object KAFKA_0_8_1 extends ApiVersion {
- val version: String = "0.8.1.X"
- val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_8_1 extends LegacyApiVersion {
+ val shortVersion = "0.8.1"
+ val recordVersion = RecordVersion.V0
val id: Int = 1
}
-case object KAFKA_0_8_2 extends ApiVersion {
- val version: String = "0.8.2.X"
- val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_8_2 extends LegacyApiVersion {
+ val shortVersion = "0.8.2"
+ val recordVersion = RecordVersion.V0
val id: Int = 2
}
-case object KAFKA_0_9_0 extends ApiVersion {
- val version: String = "0.9.0.X"
- val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_9_0 extends LegacyApiVersion {
+ val shortVersion = "0.9.0"
+ val subVersion = ""
+ val recordVersion = RecordVersion.V0
val id: Int = 3
}
-case object KAFKA_0_10_0_IV0 extends ApiVersion {
- val version: String = "0.10.0-IV0"
- val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_0_IV0 extends DefaultApiVersion {
+ val shortVersion = "0.10.0"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V1
val id: Int = 4
}
-case object KAFKA_0_10_0_IV1 extends ApiVersion {
- val version: String = "0.10.0-IV1"
- val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_0_IV1 extends DefaultApiVersion {
+ val shortVersion = "0.10.0"
+ val subVersion = "IV1"
+ val recordVersion = RecordVersion.V1
val id: Int = 5
}
-case object KAFKA_0_10_1_IV0 extends ApiVersion {
- val version: String = "0.10.1-IV0"
- val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_1_IV0 extends DefaultApiVersion {
+ val shortVersion = "0.10.1"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V1
val id: Int = 6
}
-case object KAFKA_0_10_1_IV1 extends ApiVersion {
- val version: String = "0.10.1-IV1"
- val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_1_IV1 extends DefaultApiVersion {
+ val shortVersion = "0.10.1"
+ val subVersion = "IV1"
+ val recordVersion = RecordVersion.V1
val id: Int = 7
}
-case object KAFKA_0_10_1_IV2 extends ApiVersion {
- val version: String = "0.10.1-IV2"
- val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_1_IV2 extends DefaultApiVersion {
+ val shortVersion = "0.10.1"
+ val subVersion = "IV2"
+ val recordVersion = RecordVersion.V1
val id: Int = 8
}
-case object KAFKA_0_10_2_IV0 extends ApiVersion {
- val version: String = "0.10.2-IV0"
- val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_2_IV0 extends DefaultApiVersion {
+ val shortVersion = "0.10.2"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V1
val id: Int = 9
}
-case object KAFKA_0_11_0_IV0 extends ApiVersion {
- val version: String = "0.11.0-IV0"
- val messageFormatVersion = RecordFormat.V2
+case object KAFKA_0_11_0_IV0 extends DefaultApiVersion {
+ val shortVersion = "0.11.0"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V2
val id: Int = 10
}
-case object KAFKA_0_11_0_IV1 extends ApiVersion {
- val version: String = "0.11.0-IV1"
- val messageFormatVersion = RecordFormat.V2
+case object KAFKA_0_11_0_IV1 extends DefaultApiVersion {
+ val shortVersion = "0.11.0"
+ val subVersion = "IV1"
+ val recordVersion = RecordVersion.V2
val id: Int = 11
}
-case object KAFKA_0_11_0_IV2 extends ApiVersion {
- val version: String = "0.11.0-IV2"
- val messageFormatVersion = RecordFormat.V2
+case object KAFKA_0_11_0_IV2 extends DefaultApiVersion {
+ val shortVersion = "0.11.0"
+ val subVersion = "IV2"
+ val recordVersion = RecordVersion.V2
val id: Int = 12
}
-case object KAFKA_1_0_IV0 extends ApiVersion {
- val version: String = "1.0-IV0"
- val messageFormatVersion = RecordFormat.V2
+case object KAFKA_1_0_IV0 extends DefaultApiVersion {
+ val shortVersion = "1.0"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V2
val id: Int = 13
}
-case object KAFKA_1_1_IV0 extends ApiVersion {
- val version: String = "1.1-IV0"
- val messageFormatVersion = RecordFormat.V2
+case object KAFKA_1_1_IV0 extends DefaultApiVersion {
+ val shortVersion = "1.1"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V2
val id: Int = 14
}
-case object KAFKA_2_0_IV0 extends ApiVersion {
- val version: String = "2.0-IV0"
- val messageFormatVersion = RecordFormat.V2
+case object KAFKA_2_0_IV0 extends DefaultApiVersion {
+ val shortVersion: String = "2.0"
+ val subVersion = "IV0"
+ val recordVersion = RecordVersion.V2
val id: Int = 15
}
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0b1a18a..af83775 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -483,7 +483,7 @@ class Log(@volatile var dir: File,
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
- val messageFormatVersion = config.messageFormatVersion.messageFormatVersion.value
+ val messageFormatVersion = config.messageFormatVersion.recordVersion.value
info(s"Loading producer state from offset $lastOffset with message format version $messageFormatVersion")
// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
@@ -680,7 +680,7 @@ class Log(@volatile var dir: File,
appendInfo.sourceCodec,
appendInfo.targetCodec,
config.compact,
- config.messageFormatVersion.messageFormatVersion.value,
+ config.messageFormatVersion.recordVersion.value,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7bc9e3e..828b08b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1355,7 +1355,7 @@ class KafkaApis(val requestChannel: RequestChannel,
apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
else
ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
- config.interBrokerProtocolVersion.messageFormatVersion.value)
+ config.interBrokerProtocolVersion.recordVersion.value)
}
sendResponseMaybeThrottle(request, createResponseCallback)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4834791..e296e26 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1351,11 +1351,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
- val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
- require(interBrokerProtocolVersion.messageFormatVersion.value >= messageFormatVersion.value,
- s"log.message.format.version $logMessageFormatVersionString can only be used when " +
- "inter.broker.protocol.version is set to version " +
- s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or higher")
+ val recordVersion = logMessageFormatVersion.recordVersion
+ require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value,
+ s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
+ s"is set to version ${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher")
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 309a599..0518e03 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -997,7 +997,7 @@ class ReplicaManager(val config: KafkaConfig,
}
def getMagic(topicPartition: TopicPartition): Option[Byte] =
- getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
+ getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value))
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 88c9d52..9aaadf1 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,7 +17,7 @@
package kafka.api
-import org.apache.kafka.common.record.RecordFormat
+import org.apache.kafka.common.record.RecordVersion
import org.junit.Test
import org.junit.Assert._
@@ -76,15 +76,21 @@ class ApiVersionTest {
}
@Test
- def testMinVersionForMessageFormat(): Unit = {
- assertEquals("0.8.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
- assertEquals("0.10.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
- assertEquals("0.11.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
-
- // Ensure that all message format versions have a defined min version so that we remember
- // to update the function
- for (messageFormatVersion <- RecordFormat.values)
- assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
+ def testMinSupportedVersionFor(): Unit = {
+ assertEquals(KAFKA_0_8_0, ApiVersion.minSupportedFor(RecordVersion.V0))
+ assertEquals(KAFKA_0_10_0_IV0, ApiVersion.minSupportedFor(RecordVersion.V1))
+ assertEquals(KAFKA_0_11_0_IV0, ApiVersion.minSupportedFor(RecordVersion.V2))
+
+ // Ensure that all record versions have a defined min version so that we remember to update the method
+ for (recordVersion <- RecordVersion.values)
+ assertNotNull(ApiVersion.minSupportedFor(recordVersion))
+ }
+
+ @Test
+ def testShortVersion(): Unit = {
+ assertEquals("0.8.0", KAFKA_0_8_0.shortVersion)
+ assertEquals("0.10.0", KAFKA_0_10_0_IV0.shortVersion)
+ assertEquals("0.11.0", KAFKA_0_11_0_IV0.shortVersion)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 39cbe40..6b86da6 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -533,7 +533,7 @@ class KafkaConfigTest {
ApiVersion.allVersions.foreach { interBrokerVersion =>
ApiVersion.allVersions.foreach { messageFormatVersion =>
- if (interBrokerVersion.messageFormatVersion.value >= messageFormatVersion.messageFormatVersion.value) {
+ if (interBrokerVersion.recordVersion.value >= messageFormatVersion.recordVersion.value) {
val config = buildConfig(interBrokerVersion, messageFormatVersion)
assertEquals(messageFormatVersion, config.logMessageFormatVersion)
assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.