You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/05/11 05:06:26 UTC

[kafka] branch trunk updated: MINOR: Rename RecordFormat to RecordVersion (#4809)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c3921d4  MINOR: Rename RecordFormat to RecordVersion (#4809)
c3921d4 is described below

commit c3921d489f4da80aad6f387158c33ec2e4bca52d
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu May 10 22:06:18 2018 -0700

    MINOR: Rename RecordFormat to RecordVersion (#4809)
    
    Also include a few clean-ups:
    
    * Method/variable/parameter renames to make them consistent with
    the class name
    * Return `ApiVersion` from `minSupportedFor`
    * Use `values` to remove some code duplication
    * Reduce duplication in `ApiVersion` by introducing the `shortVersion`
    method and building the versions map programatically
    * Avoid unnecessary `regex` in `ApiVersion.apply`
    * Added scaladoc to a few methods
    
    Some of these were originally discussed in:
    
    https://github.com/apache/kafka/pull/4583#pullrequestreview-98089400
    
    Added a test for `ApiVersion.shortVersion`. Relying on existing tests
    for the rest since there is no change in behaviour.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../{RecordFormat.java => RecordVersion.java}      |  26 ++-
 core/src/main/scala/kafka/api/ApiVersion.scala     | 218 ++++++++++++---------
 core/src/main/scala/kafka/log/Log.scala            |   4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   9 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  26 ++-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   2 +-
 8 files changed, 163 insertions(+), 126 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
similarity index 55%
rename from clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
rename to clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
index e71ec59..1f80d62 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordVersion.java
@@ -16,25 +16,31 @@
  */
 package org.apache.kafka.common.record;
 
-public enum RecordFormat {
+/**
+ * Defines the record format versions supported by Kafka.
+ *
+ * For historical reasons, the record format version is also known as `magic` and `message format version`. Note that
+ * the version actually applies to the {@link RecordBatch} (instead of the {@link Record}). Finally, the
+ * `message.format.version` topic config confusingly expects an ApiVersion instead of a RecordVersion.
+ */
+public enum RecordVersion {
     V0(0), V1(1), V2(2);
 
+    private static final RecordVersion[] VALUES = values();
+
     public final byte value;
 
-    RecordFormat(int value) {
+    RecordVersion(int value) {
         this.value = (byte) value;
     }
 
-    public static RecordFormat lookup(byte version) {
-        switch (version) {
-            case 0: return V0;
-            case 1: return V1;
-            case 2: return V2;
-            default: throw new IllegalArgumentException("Unknown format version: " + version);
-        }
+    public static RecordVersion lookup(byte value) {
+        if (value < 0 || value >= VALUES.length)
+            throw new IllegalArgumentException("Unknown record version: " + value);
+        return VALUES[value];
     }
 
-    public static RecordFormat current() {
+    public static RecordVersion current() {
         return V2;
     }
 
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index ff011b2..b13d237 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import org.apache.kafka.common.record.RecordFormat
+import org.apache.kafka.common.record.RecordVersion
 
 /**
  * This class contains the different Kafka versions.
@@ -42,75 +42,73 @@ object ApiVersion {
   // This implicit is necessary due to: https://issues.scala-lang.org/browse/SI-8541
   implicit def orderingByVersion[A <: ApiVersion]: Ordering[A] = Ordering.by(_.id)
 
-  private val versionNameMap = Map(
-    "0.8.0" -> KAFKA_0_8_0,
-    "0.8.1" -> KAFKA_0_8_1,
-    "0.8.2" -> KAFKA_0_8_2,
-    "0.9.0" -> KAFKA_0_9_0,
+  val allVersions: Seq[ApiVersion] = Seq(
+    KAFKA_0_8_0,
+    KAFKA_0_8_1,
+    KAFKA_0_8_2,
+    KAFKA_0_9_0,
     // 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
-    "0.10.0-IV0" -> KAFKA_0_10_0_IV0,
+    KAFKA_0_10_0_IV0,
     // 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake).
-    "0.10.0-IV1" -> KAFKA_0_10_0_IV1,
-    "0.10.0" -> KAFKA_0_10_0_IV1,
-
+    KAFKA_0_10_0_IV1,
     // introduced for JoinGroup protocol change in KIP-62
-    "0.10.1-IV0" -> KAFKA_0_10_1_IV0,
+    KAFKA_0_10_1_IV0,
     // 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
-    "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
+    KAFKA_0_10_1_IV1,
     // introduced ListOffsetRequest v1 in KIP-79
-    "0.10.1-IV2" -> KAFKA_0_10_1_IV2,
-    "0.10.1" -> KAFKA_0_10_1_IV2,
+    KAFKA_0_10_1_IV2,
     // introduced UpdateMetadataRequest v3 in KIP-103
-    "0.10.2-IV0" -> KAFKA_0_10_2_IV0,
-    "0.10.2" -> KAFKA_0_10_2_IV0,
+    KAFKA_0_10_2_IV0,
     // KIP-98 (idempotent and transactional producer support)
-    "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
+    KAFKA_0_11_0_IV0,
     // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
-    "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
+    KAFKA_0_11_0_IV1,
     // Introduced leader epoch fetches to the replica fetcher via KIP-101
-    "0.11.0-IV2" -> KAFKA_0_11_0_IV2,
-    "0.11.0" -> KAFKA_0_11_0_IV2,
+    KAFKA_0_11_0_IV2,
     // Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112
-    "1.0-IV0" -> KAFKA_1_0_IV0,
-    "1.0" -> KAFKA_1_0_IV0,
+    KAFKA_1_0_IV0,
     // Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests,
     // and KafkaStorageException for fetch requests.
-    "1.1-IV0" -> KAFKA_1_1_IV0,
-    "1.1" -> KAFKA_1_1_IV0,
+    KAFKA_1_1_IV0,
     // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
-    "2.0-IV0" -> KAFKA_2_0_IV0,
-    "2.0" -> KAFKA_2_0_IV0
+    KAFKA_2_0_IV0
   )
 
-  private val versionPattern = "\\.".r
-
-  def apply(version: String): ApiVersion = {
-    val versionsSeq = versionPattern.split(version)
-    val numSegments = if (version.startsWith("0.")) 3 else 2
-    val key = versionsSeq.take(numSegments).mkString(".")
-    versionNameMap.getOrElse(key, throw new IllegalArgumentException(s"Version `$version` is not a valid version"))
+  // Map keys are the union of the short and full versions
+  private val versionMap: Map[String, ApiVersion] =
+    allVersions.map(v => v.version -> v).toMap ++ allVersions.groupBy(_.shortVersion).map { case (k, v) => k -> v.last }
+
+  /**
+   * Return an `ApiVersion` instance for `versionString`, which can be in a variety of formats (e.g. "0.8.0", "0.8.0.x",
+   * "0.10.0", "0.10.0-IV1"). `IllegalArgumentException` is thrown if `versionString` cannot be mapped to an `ApiVersion`.
+   */
+  def apply(versionString: String): ApiVersion = {
+    val versionSegments = versionString.split('.').toSeq
+    val numSegments = if (versionString.startsWith("0.")) 3 else 2
+    val key = versionSegments.take(numSegments).mkString(".")
+    versionMap.getOrElse(key, throw new IllegalArgumentException(s"Version `$versionString` is not a valid version"))
   }
 
-  def latestVersion = versionNameMap.values.max
-
-  def allVersions: Set[ApiVersion] = {
-    versionNameMap.values.toSet
-  }
-
-  def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = {
-    messageFormatVersion match {
-      case RecordFormat.V0 => "0.8.0"
-      case RecordFormat.V1 => "0.10.0"
-      case RecordFormat.V2 => "0.11.0"
-      case _ => throw new IllegalArgumentException(s"Invalid message format version $messageFormatVersion")
+  def latestVersion: ApiVersion = allVersions.last
+
+  /**
+   * Return the minimum `ApiVersion` that supports `RecordVersion`.
+   */
+  def minSupportedFor(recordVersion: RecordVersion): ApiVersion = {
+    recordVersion match {
+      case RecordVersion.V0 => KAFKA_0_8_0
+      case RecordVersion.V1 => KAFKA_0_10_0_IV0
+      case RecordVersion.V2 => KAFKA_0_11_0_IV0
+      case _ => throw new IllegalArgumentException(s"Invalid message format version $recordVersion")
     }
   }
 }
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
-  val version: String
-  val messageFormatVersion: RecordFormat
-  val id: Int
+  def version: String
+  def shortVersion: String
+  def recordVersion: RecordVersion
+  def id: Int
 
   override def compare(that: ApiVersion): Int =
     ApiVersion.orderingByVersion.compare(this, that)
@@ -118,99 +116,127 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
   override def toString: String = version
 }
 
+/**
+ * For versions before 0.10.0, `version` and `shortVersion` were the same.
+ */
+sealed trait LegacyApiVersion extends ApiVersion {
+  def version = shortVersion
+}
+
+/**
+ * From 0.10.0 onwards, each version has a sub-version. For example, IV0 is the sub-version of 0.10.0-IV0.
+ */
+sealed trait DefaultApiVersion extends ApiVersion {
+  lazy val version = shortVersion + "-" + subVersion
+  protected def subVersion: String
+}
+
 // Keep the IDs in order of versions
-case object KAFKA_0_8_0 extends ApiVersion {
-  val version: String = "0.8.0.X"
-  val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_8_0 extends LegacyApiVersion {
+  val shortVersion = "0.8.0"
+  val recordVersion = RecordVersion.V0
   val id: Int = 0
 }
 
-case object KAFKA_0_8_1 extends ApiVersion {
-  val version: String = "0.8.1.X"
-  val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_8_1 extends LegacyApiVersion {
+  val shortVersion = "0.8.1"
+  val recordVersion = RecordVersion.V0
   val id: Int = 1
 }
 
-case object KAFKA_0_8_2 extends ApiVersion {
-  val version: String = "0.8.2.X"
-  val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_8_2 extends LegacyApiVersion {
+  val shortVersion = "0.8.2"
+  val recordVersion = RecordVersion.V0
   val id: Int = 2
 }
 
-case object KAFKA_0_9_0 extends ApiVersion {
-  val version: String = "0.9.0.X"
-  val messageFormatVersion = RecordFormat.V0
+case object KAFKA_0_9_0 extends LegacyApiVersion {
+  val shortVersion = "0.9.0"
+  val subVersion = ""
+  val recordVersion = RecordVersion.V0
   val id: Int = 3
 }
 
-case object KAFKA_0_10_0_IV0 extends ApiVersion {
-  val version: String = "0.10.0-IV0"
-  val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_0_IV0 extends DefaultApiVersion {
+  val shortVersion = "0.10.0"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V1
   val id: Int = 4
 }
 
-case object KAFKA_0_10_0_IV1 extends ApiVersion {
-  val version: String = "0.10.0-IV1"
-  val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_0_IV1 extends DefaultApiVersion {
+  val shortVersion = "0.10.0"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V1
   val id: Int = 5
 }
 
-case object KAFKA_0_10_1_IV0 extends ApiVersion {
-  val version: String = "0.10.1-IV0"
-  val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_1_IV0 extends DefaultApiVersion {
+  val shortVersion = "0.10.1"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V1
   val id: Int = 6
 }
 
-case object KAFKA_0_10_1_IV1 extends ApiVersion {
-  val version: String = "0.10.1-IV1"
-  val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_1_IV1 extends DefaultApiVersion {
+  val shortVersion = "0.10.1"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V1
   val id: Int = 7
 }
 
-case object KAFKA_0_10_1_IV2 extends ApiVersion {
-  val version: String = "0.10.1-IV2"
-  val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_1_IV2 extends DefaultApiVersion {
+  val shortVersion = "0.10.1"
+  val subVersion = "IV2"
+  val recordVersion = RecordVersion.V1
   val id: Int = 8
 }
 
-case object KAFKA_0_10_2_IV0 extends ApiVersion {
-  val version: String = "0.10.2-IV0"
-  val messageFormatVersion = RecordFormat.V1
+case object KAFKA_0_10_2_IV0 extends DefaultApiVersion {
+  val shortVersion = "0.10.2"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V1
   val id: Int = 9
 }
 
-case object KAFKA_0_11_0_IV0 extends ApiVersion {
-  val version: String = "0.11.0-IV0"
-  val messageFormatVersion = RecordFormat.V2
+case object KAFKA_0_11_0_IV0 extends DefaultApiVersion {
+  val shortVersion = "0.11.0"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V2
   val id: Int = 10
 }
 
-case object KAFKA_0_11_0_IV1 extends ApiVersion {
-  val version: String = "0.11.0-IV1"
-  val messageFormatVersion = RecordFormat.V2
+case object KAFKA_0_11_0_IV1 extends DefaultApiVersion {
+  val shortVersion = "0.11.0"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
   val id: Int = 11
 }
 
-case object KAFKA_0_11_0_IV2 extends ApiVersion {
-  val version: String = "0.11.0-IV2"
-  val messageFormatVersion = RecordFormat.V2
+case object KAFKA_0_11_0_IV2 extends DefaultApiVersion {
+  val shortVersion = "0.11.0"
+  val subVersion = "IV2"
+  val recordVersion = RecordVersion.V2
   val id: Int = 12
 }
 
-case object KAFKA_1_0_IV0 extends ApiVersion {
-  val version: String = "1.0-IV0"
-  val messageFormatVersion = RecordFormat.V2
+case object KAFKA_1_0_IV0 extends DefaultApiVersion {
+  val shortVersion = "1.0"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V2
   val id: Int = 13
 }
 
-case object KAFKA_1_1_IV0 extends ApiVersion {
-  val version: String = "1.1-IV0"
-  val messageFormatVersion = RecordFormat.V2
+case object KAFKA_1_1_IV0 extends DefaultApiVersion {
+  val shortVersion = "1.1"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V2
   val id: Int = 14
 }
 
-case object KAFKA_2_0_IV0 extends ApiVersion {
-  val version: String = "2.0-IV0"
-  val messageFormatVersion = RecordFormat.V2
+case object KAFKA_2_0_IV0 extends DefaultApiVersion {
+  val shortVersion: String = "2.0"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V2
   val id: Int = 15
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0b1a18a..af83775 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -483,7 +483,7 @@ class Log(@volatile var dir: File,
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
     checkIfMemoryMappedBufferClosed()
-    val messageFormatVersion = config.messageFormatVersion.messageFormatVersion.value
+    val messageFormatVersion = config.messageFormatVersion.recordVersion.value
     info(s"Loading producer state from offset $lastOffset with message format version $messageFormatVersion")
 
     // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
@@ -680,7 +680,7 @@ class Log(@volatile var dir: File,
               appendInfo.sourceCodec,
               appendInfo.targetCodec,
               config.compact,
-              config.messageFormatVersion.messageFormatVersion.value,
+              config.messageFormatVersion.recordVersion.value,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 7bc9e3e..828b08b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1355,7 +1355,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
       else
         ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
-          config.interBrokerProtocolVersion.messageFormatVersion.value)
+          config.interBrokerProtocolVersion.recordVersion.value)
     }
     sendResponseMaybeThrottle(request, createResponseCallback)
   }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4834791..e296e26 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1351,11 +1351,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
 
-    val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
-    require(interBrokerProtocolVersion.messageFormatVersion.value >= messageFormatVersion.value,
-      s"log.message.format.version $logMessageFormatVersionString can only be used when " +
-        "inter.broker.protocol.version is set to version " +
-        s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or higher")
+    val recordVersion = logMessageFormatVersion.recordVersion
+    require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value,
+      s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
+      s"is set to version ${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher")
 
     val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 309a599..0518e03 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -997,7 +997,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getMagic(topicPartition: TopicPartition): Option[Byte] =
-    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
+    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value))
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 88c9d52..9aaadf1 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import org.apache.kafka.common.record.RecordFormat
+import org.apache.kafka.common.record.RecordVersion
 import org.junit.Test
 import org.junit.Assert._
 
@@ -76,15 +76,21 @@ class ApiVersionTest {
   }
 
   @Test
-  def testMinVersionForMessageFormat(): Unit = {
-    assertEquals("0.8.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
-    assertEquals("0.10.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
-    assertEquals("0.11.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
-
-    // Ensure that all message format versions have a defined min version so that we remember
-    // to update the function
-    for (messageFormatVersion <- RecordFormat.values)
-      assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
+  def testMinSupportedVersionFor(): Unit = {
+    assertEquals(KAFKA_0_8_0, ApiVersion.minSupportedFor(RecordVersion.V0))
+    assertEquals(KAFKA_0_10_0_IV0, ApiVersion.minSupportedFor(RecordVersion.V1))
+    assertEquals(KAFKA_0_11_0_IV0, ApiVersion.minSupportedFor(RecordVersion.V2))
+
+    // Ensure that all record versions have a defined min version so that we remember to update the method
+    for (recordVersion <- RecordVersion.values)
+      assertNotNull(ApiVersion.minSupportedFor(recordVersion))
+  }
+
+  @Test
+  def testShortVersion(): Unit = {
+    assertEquals("0.8.0", KAFKA_0_8_0.shortVersion)
+    assertEquals("0.10.0", KAFKA_0_10_0_IV0.shortVersion)
+    assertEquals("0.11.0", KAFKA_0_11_0_IV0.shortVersion)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 39cbe40..6b86da6 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -533,7 +533,7 @@ class KafkaConfigTest {
 
     ApiVersion.allVersions.foreach { interBrokerVersion =>
       ApiVersion.allVersions.foreach { messageFormatVersion =>
-        if (interBrokerVersion.messageFormatVersion.value >= messageFormatVersion.messageFormatVersion.value) {
+        if (interBrokerVersion.recordVersion.value >= messageFormatVersion.recordVersion.value) {
           val config = buildConfig(interBrokerVersion, messageFormatVersion)
           assertEquals(messageFormatVersion, config.logMessageFormatVersion)
           assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.