You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/02/21 21:49:54 UTC

[kafka] branch 1.0 updated: KAFKA-6238; Fix inter-broker protocol message format compatibility check

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 4b3aeff  KAFKA-6238; Fix inter-broker protocol message format compatibility check
4b3aeff is described below

commit 4b3aeff685d8c91e2ca51c72ae0d6ab9f62b0b4e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Feb 21 09:38:39 2018 +0000

    KAFKA-6238; Fix inter-broker protocol message format compatibility check
    
    This patch fixes a bug in the validation of the inter-broker protocol and the message format version. We should allow the configured message format api version to be greater than the inter-broker protocol api version as long as the actual message format versions are equal. For example, if the message format version is set to 1.0, it is fine for the inter-broker protocol version to be 0.11.0 because they both use message format v2.
    
    I have added a unit test which checks compatibility for all combinations of the message format version and the inter-broker protocol version.
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
    
    Closes #4583 from hachikuji/KAFKA-6328-REOPENED
---
 .../apache/kafka/common/record/RecordFormat.java   | 41 ++++++++++++++++++++
 core/src/main/scala/kafka/api/ApiVersion.scala     | 45 ++++++++++++++--------
 core/src/main/scala/kafka/log/Log.scala            |  4 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  5 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  9 ++++-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala | 13 +++++++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 24 ++++++++++++
 docs/upgrade.html                                  | 17 ++++----
 9 files changed, 129 insertions(+), 31 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
new file mode 100644
index 0000000..e71ec59
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordFormat.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+public enum RecordFormat {
+    V0(0), V1(1), V2(2);
+
+    public final byte value;
+
+    RecordFormat(int value) {
+        this.value = (byte) value;
+    }
+
+    public static RecordFormat lookup(byte version) {
+        switch (version) {
+            case 0: return V0;
+            case 1: return V1;
+            case 2: return V2;
+            default: throw new IllegalArgumentException("Unknown format version: " + version);
+        }
+    }
+
+    public static RecordFormat current() {
+        return V2;
+    }
+
+}
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index e509fc5..cd6fdd1 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
 
 package kafka.api
 
-import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.record.RecordFormat
 
 /**
  * This class contains the different Kafka versions.
@@ -86,11 +86,23 @@ object ApiVersion {
 
   def latestVersion = versionNameMap.values.max
 
+  def allVersions: Set[ApiVersion] = {
+    versionNameMap.values.toSet
+  }
+
+  def minVersionForMessageFormat(messageFormatVersion: RecordFormat): String = {
+    messageFormatVersion match {
+      case RecordFormat.V0 => "0.8.0"
+      case RecordFormat.V1 => "0.10.0"
+      case RecordFormat.V2 => "0.11.0"
+      case _ => throw new IllegalArgumentException(s"Invalid message format version $messageFormatVersion")
+    }
+  }
 }
 
 sealed trait ApiVersion extends Ordered[ApiVersion] {
   val version: String
-  val messageFormatVersion: Byte
+  val messageFormatVersion: RecordFormat
   val id: Int
 
   override def compare(that: ApiVersion): Int =
@@ -102,85 +114,84 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
 // Keep the IDs in order of versions
 case object KAFKA_0_8_0 extends ApiVersion {
   val version: String = "0.8.0.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 0
 }
 
 case object KAFKA_0_8_1 extends ApiVersion {
   val version: String = "0.8.1.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 1
 }
 
 case object KAFKA_0_8_2 extends ApiVersion {
   val version: String = "0.8.2.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 2
 }
 
 case object KAFKA_0_9_0 extends ApiVersion {
   val version: String = "0.9.0.X"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
+  val messageFormatVersion = RecordFormat.V0
   val id: Int = 3
 }
 
 case object KAFKA_0_10_0_IV0 extends ApiVersion {
   val version: String = "0.10.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 4
 }
 
 case object KAFKA_0_10_0_IV1 extends ApiVersion {
   val version: String = "0.10.0-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 5
 }
 
 case object KAFKA_0_10_1_IV0 extends ApiVersion {
   val version: String = "0.10.1-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 6
 }
 
 case object KAFKA_0_10_1_IV1 extends ApiVersion {
   val version: String = "0.10.1-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 7
 }
 
 case object KAFKA_0_10_1_IV2 extends ApiVersion {
   val version: String = "0.10.1-IV2"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 8
 }
 
 case object KAFKA_0_10_2_IV0 extends ApiVersion {
   val version: String = "0.10.2-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
+  val messageFormatVersion = RecordFormat.V1
   val id: Int = 9
 }
 
 case object KAFKA_0_11_0_IV0 extends ApiVersion {
   val version: String = "0.11.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 10
 }
 
 case object KAFKA_0_11_0_IV1 extends ApiVersion {
   val version: String = "0.11.0-IV1"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 11
 }
 
 case object KAFKA_0_11_0_IV2 extends ApiVersion {
   val version: String = "0.11.0-IV2"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 12
 }
 
 case object KAFKA_1_0_IV0 extends ApiVersion {
   val version: String = "1.0-IV0"
-  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val messageFormatVersion = RecordFormat.V2
   val id: Int = 13
 }
-
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index bd192a3..9ea3e3e 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -473,7 +473,7 @@ class Log(@volatile var dir: File,
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
     checkIfMemoryMappedBufferClosed()
-    val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
+    val messageFormatVersion = config.messageFormatVersion.messageFormatVersion.value
     info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " +
       s"format version $messageFormatVersion")
 
@@ -647,7 +647,7 @@ class Log(@volatile var dir: File,
               appendInfo.sourceCodec,
               appendInfo.targetCodec,
               config.compact,
-              config.messageFormatVersion.messageFormatVersion,
+              config.messageFormatVersion.messageFormatVersion.value,
               config.messageTimestampType,
               config.messageTimestampDifferenceMaxMs,
               leaderEpoch,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index fa4dfbd..d6b478d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -56,7 +56,7 @@ import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
 
-import scala.collection.{mutable, _}
+import scala.collection._
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
@@ -1288,7 +1288,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (apiVersionRequest.hasUnsupportedRequestVersion)
         apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
       else
-        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion)
+        ApiVersionsResponse.apiVersionsResponse(requestThrottleMs,
+          config.interBrokerProtocolVersion.messageFormatVersion.value)
     }
     sendResponseMaybeThrottle(request, createResponseCallback)
   }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5ebe8e5..183fb6f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1202,8 +1202,13 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
     require(!advertisedListeners.exists(endpoint => endpoint.host=="0.0.0.0"),
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
-    require(interBrokerProtocolVersion >= logMessageFormatVersion,
-      s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
+
+    val messageFormatVersion = logMessageFormatVersion.messageFormatVersion
+    require(interBrokerProtocolVersion.messageFormatVersion.value >= messageFormatVersion.value,
+      s"log.message.format.version $logMessageFormatVersionString can only be used when " +
+        "inter.broker.protocol.version is set to version " +
+        s"${ApiVersion.minVersionForMessageFormat(messageFormatVersion)} or higher")
+
     val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
     require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
       s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0362e1e..9e0c83e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -982,7 +982,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def getMagic(topicPartition: TopicPartition): Option[Byte] =
-    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion))
+    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.messageFormatVersion.value))
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 6fc6974..88c9d52 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.api
 
+import org.apache.kafka.common.record.RecordFormat
 import org.junit.Test
 import org.junit.Assert._
 
@@ -74,4 +75,16 @@ class ApiVersionTest {
     assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1"))
   }
 
+  @Test
+  def testMinVersionForMessageFormat(): Unit = {
+    assertEquals("0.8.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V0))
+    assertEquals("0.10.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V1))
+    assertEquals("0.11.0", ApiVersion.minVersionForMessageFormat(RecordFormat.V2))
+
+    // Ensure that all message format versions have a defined min version so that we remember
+    // to update the function
+    for (messageFormatVersion <- RecordFormat.values)
+      assertNotNull(ApiVersion.minVersionForMessageFormat(messageFormatVersion))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 522941d..1972ab6 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -523,6 +523,30 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testInterBrokerVersionMessageFormatCompatibility(): Unit = {
+    def buildConfig(interBrokerProtocol: ApiVersion, messageFormat: ApiVersion): KafkaConfig = {
+      val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+      props.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocol.version)
+      props.put(KafkaConfig.LogMessageFormatVersionProp, messageFormat.version)
+      KafkaConfig.fromProps(props)
+    }
+
+    ApiVersion.allVersions.foreach { interBrokerVersion =>
+      ApiVersion.allVersions.foreach { messageFormatVersion =>
+        if (interBrokerVersion.messageFormatVersion.value >= messageFormatVersion.messageFormatVersion.value) {
+          val config = buildConfig(interBrokerVersion, messageFormatVersion)
+          assertEquals(messageFormatVersion, config.logMessageFormatVersion)
+          assertEquals(interBrokerVersion, config.interBrokerProtocolVersion)
+        } else {
+          intercept[IllegalArgumentException] {
+            buildConfig(interBrokerVersion, messageFormatVersion)
+          }
+        }
+      }
+    }
+  }
+
+  @Test
   def testFromPropsInvalid() {
     def getBaseProperties(): Properties = {
       val validRequiredProperties = new Properties()
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ad665ad..caf8e1c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -34,12 +34,13 @@
         <ul>
             <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
             <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  (See <a href="#upgrade_10_performance_impact">potential performance impact
-		following the upgrade</a> for the details on what this configuration does.)</li>
+                following the upgrade</a> for the details on what this configuration does.)</li>
         </ul>
-	If you are upgrading from 0.11.0.x and you have not overridden the message format, then you only need to override
-	the inter-broker protocol format.
+        If you are upgrading from 0.11.0.x and you have not overridden the message format, you must set
+        both the message format version and the inter-broker protocol version to 0.11.0.
         <ul>
-            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0).</li>
+            <li>inter.broker.protocol.version=0.11.0</li>
+            <li>log.message.format.version=0.11.0</li>
         </ul>
     </li>
     <li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
@@ -47,9 +48,11 @@
     <li> Restart the brokers one by one for the new protocol version to take effect. </li>
     <li> If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
         upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
-        change log.message.format.version to 1.0 on each broker and restart them one by one. Note that the older Scala consumer
-        does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
-        take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>), the newer Java consumer must be used.</li>
+        change log.message.format.version to 1.0 on each broker and restart them one by one. If you are upgrading from
+        0.11.0 and log.message.format.version is set to 0.11.0, you can update the config and skip the rolling restart.
+        Note that the older Scala consumer does not support the new message format introduced in 0.11, so to avoid the
+        performance cost of down-conversion (or to take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>),
+        the newer Java consumer must be used.</li>
 </ol>
 
 <p><b>Additional Upgrade Notes:</b></p>

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.