You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/14 04:28:09 UTC

[kafka] branch 2.0 updated: KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new 0db889d  KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192)
0db889d is described below

commit 0db889de723ca9e9b03dd4bf7ce90acaabb073ab
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Wed Jun 13 21:23:23 2018 -0700

    KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192)
    
    Add support for the topic-level `message.downconversion.enable` config as part of KIP-283.
---
 .../apache/kafka/common/config/TopicConfig.java    |   7 +
 core/src/main/scala/kafka/log/LogConfig.scala      |   9 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  90 +++++------
 core/src/main/scala/kafka/server/KafkaConfig.scala |   5 +
 core/src/main/scala/kafka/server/KafkaServer.scala |   1 +
 .../main/scala/kafka/server/ReplicaManager.scala   |   7 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   1 +
 .../FetchRequestDownConversionConfigTest.scala     | 165 +++++++++++++++++++++
 .../scala/unit/kafka/server/FetchRequestTest.scala |   2 +-
 9 files changed, 238 insertions(+), 49 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d6b7003..fb2208c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -165,4 +165,11 @@ public class TopicConfig {
         "the timestamp when a broker receives a message and the timestamp specified in the message. If " +
         "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " +
         "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
+
+    public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
+    public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " +
+        "down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " +
+        "broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
+        "with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
+        "does not apply to any message format conversion that might be required for replication to followers.";
 }
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c827121..bd4768e 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -63,6 +63,7 @@ object Defaults {
   val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
   val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
   val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
+  val MessageDownConversionEnable = kafka.server.Defaults.MessageDownConversionEnable
 }
 
 case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] = Set.empty)
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
   val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
   val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp)
+  val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp)
 
   def randomSegmentJitter: Long =
     if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -131,6 +133,7 @@ object LogConfig {
   val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG
   val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
   val MessageTimestampDifferenceMaxMsProp = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
+  val MessageDownConversionEnableProp = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG
 
   // Leave these out of TopicConfig for now as they are replication quota configs
   val LeaderReplicationThrottledReplicasProp = "leader.replication.throttled.replicas"
@@ -158,6 +161,7 @@ object LogConfig {
   val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC
   val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC
   val MessageTimestampDifferenceMaxMsDoc = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC
+  val MessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
 
   val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " +
     "the leader side. The list should describe a set of replicas in the form " +
@@ -262,6 +266,8 @@ object LogConfig {
         LeaderReplicationThrottledReplicasDoc, LeaderReplicationThrottledReplicasProp)
       .define(FollowerReplicationThrottledReplicasProp, LIST, Defaults.FollowerReplicationThrottledReplicas, ThrottledReplicaListValidator, MEDIUM,
         FollowerReplicationThrottledReplicasDoc, FollowerReplicationThrottledReplicasProp)
+      .define(MessageDownConversionEnableProp, BOOLEAN, Defaults.MessageDownConversionEnable, LOW,
+        MessageDownConversionEnableDoc, KafkaConfig.LogMessageDownConversionEnableProp)
   }
 
   def apply(): LogConfig = LogConfig(new Properties())
@@ -325,7 +331,8 @@ object LogConfig {
     PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
     MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
     MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
-    MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp
+    MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp,
+    MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp
   )
 
 }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cdd0d72..37a11bd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
-import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -507,44 +506,41 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchRequest.toForget(),
           fetchRequest.isFromFollower())
 
+    def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
+      new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+        FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+    }
+
     val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
     val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
     if (fetchRequest.isFromFollower()) {
       // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
-        fetchContext.foreachPartition((topicPartition, data) => {
-          if (!metadataCache.contains(topicPartition)) {
-            erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-              FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-          } else {
+        fetchContext.foreachPartition { (topicPartition, data) =>
+          if (!metadataCache.contains(topicPartition))
+            erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+          else
             interesting += (topicPartition -> data)
-          }
-        })
+        }
       } else {
-        fetchContext.foreachPartition((part, _) => {
-          erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-            FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-        })
+        fetchContext.foreachPartition { (part, _) =>
+          erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+        }
       }
     } else {
       // Regular Kafka consumers need READ permission on each partition they are fetching.
-      fetchContext.foreachPartition((topicPartition, data) => {
+      fetchContext.foreachPartition { (topicPartition, data) =>
         if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
-          erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-            FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+          erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
         else if (!metadataCache.contains(topicPartition))
-          erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
-            FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-            FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+          erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
           interesting += (topicPartition -> data)
-      })
+      }
     }
 
-    def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = {
+    def maybeConvertFetchedData(tp: TopicPartition,
+                                partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = {
       // Down-conversion of the fetched records is needed when the stored magic version is
       // greater than that supported by the client (as indicated by the fetch request version). If the
       // configured magic version for the topic is less than or equal to that supported by the version of the
@@ -552,8 +548,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       // know it must be supported. However, if the magic version is changed from a higher version back to a
       // lower version, this check will no longer be valid and we will fail to down-convert the messages
       // which were written in the new format prior to the version downgrade.
-      replicaManager.getMagic(tp).flatMap { magic =>
-        val downConvertMagic = {
+      val unconvertedRecords = partitionData.records
+      val logConfig = replicaManager.getLogConfig(tp)
+      val downConvertMagic =
+        logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
           if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
             Some(RecordBatch.MAGIC_VALUE_V0)
           else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
@@ -562,28 +560,36 @@ class KafkaApis(val requestChannel: RequestChannel,
             None
         }
 
-        downConvertMagic.map { magic =>
-          trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
-
-          // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
-          // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
-          // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
-          // client.
-          new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
-        }
-      }.getOrElse(unconvertedRecords)
+      // For fetch requests from clients, check if down-conversion is disabled for the particular partition
+      if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) {
+        trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.")
+        errorResponse(Errors.UNSUPPORTED_VERSION)
+      } else {
+        val convertedRecords =
+          downConvertMagic.map { magic =>
+            trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
+            // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
+            // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
+            // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
+            // client.
+            new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
+          }.getOrElse(unconvertedRecords)
+        new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
+          FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+          convertedRecords)
+      }
     }
 
     // the callback for process a fetch response, invoked before throttling
     def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
       val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
-      responsePartitionData.foreach{ case (tp, data) =>
+      responsePartitionData.foreach { case (tp, data) =>
         val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
         val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
         partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
           data.logStartOffset, abortedTransactions, data.records))
       }
-      erroneous.foreach{case (tp, data) => partitions.put(tp, data)}
+      erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
 
       // When this callback is triggered, the remote API call has completed.
       // Record time before any byte-rate throttling.
@@ -598,14 +604,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (unconvertedPartitionData.error != Errors.NONE)
             debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
               s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
-          val convertedRecords = convertRecords(tp, unconvertedPartitionData.records)
-          val convertedPartitionData = new FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error,
-            unconvertedPartitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedPartitionData.logStartOffset,
-            unconvertedPartitionData.abortedTransactions, convertedRecords)
-          convertedData.put(tp, convertedPartitionData)
+          convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
         }
 
-        // Prepare fetch resopnse from converted data
+        // Prepare fetch response from converted data
         val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs,
           unconvertedFetchResponse.sessionId())
         response.responseData.asScala.foreach { case (topicPartition, data) =>
@@ -1455,7 +1457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
           } else Map.empty
 
-        val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) 
+        val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
         val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults
         sendResponseCallback(completeResults)
       }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ecbb790..2760def 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -115,6 +115,7 @@ object Defaults {
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
+  val MessageDownConversionEnable = true
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -329,6 +330,7 @@ object KafkaConfig {
   val MinInSyncReplicasProp = "min.insync.replicas"
   val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
   val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
+  val LogMessageDownConversionEnableProp = LogConfigPrefix + "message.downconversion.enable"
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
   val DefaultReplicationFactorProp = "default.replication.factor"
@@ -598,6 +600,7 @@ object KafkaConfig {
     "implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
   val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
     "implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
+  val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
 
   /** ********* Replication configuration ***********/
   val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
@@ -861,6 +864,7 @@ object KafkaConfig {
       .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
       .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
       .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
+      .define(LogMessageDownConversionEnableProp, BOOLEAN, Defaults.MessageDownConversionEnable, LOW, LogMessageDownConversionEnableDoc)
 
       /** ********* Replication configuration ***********/
       .define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
@@ -1134,6 +1138,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
   def logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
   def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+  def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 57bca69..f73ede6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -79,6 +79,7 @@ object KafkaServer {
     logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
     logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
     logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long)
+    logProps.put(LogConfig.MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
     logProps
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 24f3235..965595b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.controller.{KafkaController, StateChangeLogger}
-import kafka.log.{Log, LogAppendInfo, LogManager}
+import kafka.log.{Log, LogAppendInfo, LogConfig, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.server.checkpoints.OffsetCheckpointFile
@@ -995,8 +995,9 @@ class ReplicaManager(val config: KafkaConfig,
     quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
   }
 
-  def getMagic(topicPartition: TopicPartition): Option[Byte] =
-    getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value))
+  def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = getReplica(topicPartition).flatMap(_.log.map(_.config))
+
+  def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value)
 
   def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] =  {
     replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 45b3fdc..69ca317 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -369,6 +369,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     props.put(KafkaConfig.LogPreAllocateProp, true.toString)
     props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString)
     props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+    props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
     reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "4000"))
 
     // Verify that all broker defaults have been updated
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
new file mode 100644
index 0000000..e5ef985
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.Assert._
+import org.junit.Test
+
+class FetchRequestDownConversionConfigTest extends BaseRequestTest {
+  private var producer: KafkaProducer[String, String] = null
+  override def numBrokers: Int = 1
+
+  override def setUp(): Unit = {
+    super.setUp()
+    initProducer()
+  }
+
+  override def tearDown(): Unit = {
+    if (producer != null)
+      producer.close()
+    super.tearDown()
+  }
+
+  override protected def propertyOverrides(properties: Properties): Unit = {
+    super.propertyOverrides(properties)
+    properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
+  }
+
+  private def initProducer(): Unit = {
+    producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+  }
+
+  private def createTopics(numTopics: Int, numPartitions: Int,
+                           configs: Map[String, String] = Map.empty, topicSuffixStart: Int = 0): Map[TopicPartition, Int] = {
+    val topics = (0 until numTopics).map(t => s"topic${t + topicSuffixStart}")
+    val topicConfig = new Properties
+    topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString)
+    configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
+    topics.flatMap { topic =>
+      val partitionToLeader = createTopic(topic, numPartitions = numPartitions, replicationFactor = 1,
+        topicConfig = topicConfig)
+      partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader }
+    }.toMap
+  }
+
+  private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+                                 offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
+    val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+    topicPartitions.foreach { tp =>
+      partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
+    }
+    partitionMap
+  }
+
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse[MemoryRecords] = {
+    val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
+    FetchResponse.parse(response, request.version)
+  }
+
+  /**
+   * Tests that fetch request that require down-conversion returns with an error response when down-conversion is disabled on broker.
+   */
+  @Test
+  def testV1FetchWithDownConversionDisabled(): Unit = {
+    val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+    val topicPartitions = topicMap.keySet.toSeq
+    topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
+      topicPartitions)).build(1)
+    val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+    topicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error))
+  }
+
+  /**
+   * Tests that "message.downconversion.enable" has no effect when down-conversion is not required.
+   */
+  @Test
+  def testLatestFetchWithDownConversionDisabled(): Unit = {
+    val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+    val topicPartitions = topicMap.keySet.toSeq
+    topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
+      topicPartitions)).build()
+    val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+    topicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error))
+  }
+
+  /**
+   * Tests that "message.downconversion.enable" can be set at topic level, and its configuration is obeyed for client
+   * fetch requests.
+   */
+  @Test
+  def testV1FetchWithTopicLevelOverrides(): Unit = {
+    // create topics with default down-conversion configuration (i.e. conversion disabled)
+    val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0)
+    val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq
+
+    // create topics with down-conversion configuration enabled
+    val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+    val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5)
+    val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq
+
+    val allTopics = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions
+    val leaderId = conversionDisabledTopicsMap.head._2
+
+    allTopics.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
+      allTopics)).build(1)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+    conversionDisabledTopicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error))
+    conversionEnabledTopicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error))
+  }
+
+  /**
+   * Tests that "message.downconversion.enable" has no effect on fetch requests from replicas.
+   */
+  @Test
+  def testV1FetchFromReplica(): Unit = {
+    // create topics with default down-conversion configuration (i.e. conversion disabled)
+    val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0)
+    val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq
+
+    // create topics with down-conversion configuration enabled
+    val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+    val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5)
+    val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq
+
+    val allTopicPartitions = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions
+    val leaderId = conversionDisabledTopicsMap.head._2
+
+    allTopicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+    val fetchRequest = FetchRequest.Builder.forReplica(1, 1, Int.MaxValue, 0,
+      createPartitionMap(1024, allTopicPartitions)).build()
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+    allTopicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error))
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 63e23b2..06ff2d9 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -426,7 +426,7 @@ class FetchRequestTest extends BaseRequestTest {
   }
 
   private def createTopics(numTopics: Int, numPartitions: Int, configs: Map[String, String] = Map.empty): Map[TopicPartition, Int] = {
-    val topics = (0 until numPartitions).map(t => s"topic$t")
+    val topics = (0 until numTopics).map(t => s"topic$t")
     val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
     configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.