You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/14 04:23:38 UTC
[kafka] branch trunk updated: KAFKA-7030;
Add configuration to disable message down-conversion (KIP-283)
(#5192)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a71bfb KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192)
9a71bfb is described below
commit 9a71bfb9d64dae5d0296d162a01a62d8c13324da
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Wed Jun 13 21:23:23 2018 -0700
KAFKA-7030; Add configuration to disable message down-conversion (KIP-283) (#5192)
Add support for the topic-level `message.downconversion.enable` config as part of KIP-283.
---
.../apache/kafka/common/config/TopicConfig.java | 7 +
core/src/main/scala/kafka/log/LogConfig.scala | 9 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 90 +++++------
core/src/main/scala/kafka/server/KafkaConfig.scala | 5 +
core/src/main/scala/kafka/server/KafkaServer.scala | 1 +
.../main/scala/kafka/server/ReplicaManager.scala | 7 +-
.../server/DynamicBrokerReconfigurationTest.scala | 1 +
.../FetchRequestDownConversionConfigTest.scala | 165 +++++++++++++++++++++
.../scala/unit/kafka/server/FetchRequestTest.scala | 2 +-
9 files changed, 238 insertions(+), 49 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index d6b7003..fb2208c 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -165,4 +165,11 @@ public class TopicConfig {
"the timestamp when a broker receives a message and the timestamp specified in the message. If " +
"message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " +
"exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
+
+ public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
+ public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "This configuration controls whether " +
+ "down-conversion of message formats is enabled to satisfy consume requests. When set to <code>false</code>, " +
+ "broker will not perform down-conversion for consumers expecting an older message format. The broker responds " +
+ "with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" +
+ "does not apply to any message format conversion that might be required for replication to followers.";
}
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index c827121..bd4768e 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -63,6 +63,7 @@ object Defaults {
val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
+ val MessageDownConversionEnable = kafka.server.Defaults.MessageDownConversionEnable
}
case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] = Set.empty)
@@ -96,6 +97,7 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String]
val messageTimestampDifferenceMaxMs = getLong(LogConfig.MessageTimestampDifferenceMaxMsProp).longValue
val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp)
val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp)
+ val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp)
def randomSegmentJitter: Long =
if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
@@ -131,6 +133,7 @@ object LogConfig {
val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG
val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG
val MessageTimestampDifferenceMaxMsProp = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG
+ val MessageDownConversionEnableProp = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG
// Leave these out of TopicConfig for now as they are replication quota configs
val LeaderReplicationThrottledReplicasProp = "leader.replication.throttled.replicas"
@@ -158,6 +161,7 @@ object LogConfig {
val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC
val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC
val MessageTimestampDifferenceMaxMsDoc = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC
+ val MessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC
val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " +
"the leader side. The list should describe a set of replicas in the form " +
@@ -262,6 +266,8 @@ object LogConfig {
LeaderReplicationThrottledReplicasDoc, LeaderReplicationThrottledReplicasProp)
.define(FollowerReplicationThrottledReplicasProp, LIST, Defaults.FollowerReplicationThrottledReplicas, ThrottledReplicaListValidator, MEDIUM,
FollowerReplicationThrottledReplicasDoc, FollowerReplicationThrottledReplicasProp)
+ .define(MessageDownConversionEnableProp, BOOLEAN, Defaults.MessageDownConversionEnable, LOW,
+ MessageDownConversionEnableDoc, KafkaConfig.LogMessageDownConversionEnableProp)
}
def apply(): LogConfig = LogConfig(new Properties())
@@ -325,7 +331,8 @@ object LogConfig {
PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
- MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp
+ MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp,
+ MessageDownConversionEnableProp -> KafkaConfig.LogMessageDownConversionEnableProp
)
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index cdd0d72..37a11bd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
-import org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -507,44 +506,41 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.toForget(),
fetchRequest.isFromFollower())
+ def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
+ new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
+ FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ }
+
val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
if (fetchRequest.isFromFollower()) {
// The follower must have ClusterAction on ClusterResource in order to fetch partition data.
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
- fetchContext.foreachPartition((topicPartition, data) => {
- if (!metadataCache.contains(topicPartition)) {
- erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
- } else {
+ fetchContext.foreachPartition { (topicPartition, data) =>
+ if (!metadataCache.contains(topicPartition))
+ erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ else
interesting += (topicPartition -> data)
- }
- })
+ }
} else {
- fetchContext.foreachPartition((part, _) => {
- erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
- })
+ fetchContext.foreachPartition { (part, _) =>
+ erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+ }
}
} else {
// Regular Kafka consumers need READ permission on each partition they are fetching.
- fetchContext.foreachPartition((topicPartition, data) => {
+ fetchContext.foreachPartition { (topicPartition, data) =>
if (!authorize(request.session, Read, Resource(Topic, topicPartition.topic, LITERAL)))
- erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
- erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
- FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
- FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
+ erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
interesting += (topicPartition -> data)
- })
+ }
}
- def convertRecords(tp: TopicPartition, unconvertedRecords: Records): BaseRecords = {
+ def maybeConvertFetchedData(tp: TopicPartition,
+ partitionData: FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords] = {
// Down-conversion of the fetched records is needed when the stored magic version is
// greater than that supported by the client (as indicated by the fetch request version). If the
// configured magic version for the topic is less than or equal to that supported by the version of the
@@ -552,8 +548,10 @@ class KafkaApis(val requestChannel: RequestChannel,
// know it must be supported. However, if the magic version is changed from a higher version back to a
// lower version, this check will no longer be valid and we will fail to down-convert the messages
// which were written in the new format prior to the version downgrade.
- replicaManager.getMagic(tp).flatMap { magic =>
- val downConvertMagic = {
+ val unconvertedRecords = partitionData.records
+ val logConfig = replicaManager.getLogConfig(tp)
+ val downConvertMagic =
+ logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap { magic =>
if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
Some(RecordBatch.MAGIC_VALUE_V0)
else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
@@ -562,28 +560,36 @@ class KafkaApis(val requestChannel: RequestChannel,
None
}
- downConvertMagic.map { magic =>
- trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
-
- // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
- // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
- // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
- // client.
- new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
- }
- }.getOrElse(unconvertedRecords)
+ // For fetch requests from clients, check if down-conversion is disabled for the particular partition
+ if (downConvertMagic.isDefined && !fetchRequest.isFromFollower && !logConfig.forall(_.messageDownConversionEnable)) {
+ trace(s"Conversion to message format ${downConvertMagic.get} is disabled for partition $tp. Sending unsupported version response to $clientId.")
+ errorResponse(Errors.UNSUPPORTED_VERSION)
+ } else {
+ val convertedRecords =
+ downConvertMagic.map { magic =>
+ trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId")
+ // Because down-conversion is extremely memory intensive, we want to try and delay the down-conversion as much
+ // as possible. With KIP-283, we have the ability to lazily down-convert in a chunked manner. The lazy, chunked
+ // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
+ // client.
+ new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)
+ }.getOrElse(unconvertedRecords)
+ new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, partitionData.logStartOffset, partitionData.abortedTransactions,
+ convertedRecords)
+ }
}
// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
- responsePartitionData.foreach{ case (tp, data) =>
+ responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
data.logStartOffset, abortedTransactions, data.records))
}
- erroneous.foreach{case (tp, data) => partitions.put(tp, data)}
+ erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
// When this callback is triggered, the remote API call has completed.
// Record time before any byte-rate throttling.
@@ -598,14 +604,10 @@ class KafkaApis(val requestChannel: RequestChannel,
if (unconvertedPartitionData.error != Errors.NONE)
debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
- val convertedRecords = convertRecords(tp, unconvertedPartitionData.records)
- val convertedPartitionData = new FetchResponse.PartitionData[BaseRecords](unconvertedPartitionData.error,
- unconvertedPartitionData.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, unconvertedPartitionData.logStartOffset,
- unconvertedPartitionData.abortedTransactions, convertedRecords)
- convertedData.put(tp, convertedPartitionData)
+ convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
}
- // Prepare fetch resopnse from converted data
+ // Prepare fetch response from converted data
val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs,
unconvertedFetchResponse.sessionId())
response.responseData.asScala.foreach { case (topicPartition, data) =>
@@ -1455,7 +1457,7 @@ class KafkaApis(val requestChannel: RequestChannel,
duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
} else Map.empty
- val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
+ val unauthorizedTopicsResults = unauthorizedTopics.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
val completeResults = results ++ duplicatedTopicsResults ++ unauthorizedTopicsResults
sendResponseCallback(completeResults)
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ecbb790..2760def 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -115,6 +115,7 @@ object Defaults {
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
+ val MessageDownConversionEnable = true
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMs = RequestTimeoutMs
@@ -329,6 +330,7 @@ object KafkaConfig {
val MinInSyncReplicasProp = "min.insync.replicas"
val CreateTopicPolicyClassNameProp = "create.topic.policy.class.name"
val AlterConfigPolicyClassNameProp = "alter.config.policy.class.name"
+ val LogMessageDownConversionEnableProp = LogConfigPrefix + "message.downconversion.enable"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
@@ -598,6 +600,7 @@ object KafkaConfig {
"implement the <code>org.apache.kafka.server.policy.CreateTopicPolicy</code> interface."
val AlterConfigPolicyClassNameDoc = "The alter configs policy class that should be used for validation. The class should " +
"implement the <code>org.apache.kafka.server.policy.AlterConfigPolicy</code> interface."
+ val LogMessageDownConversionEnableDoc = TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_DOC;
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsDoc = "The socket timeout for controller-to-broker channels"
@@ -861,6 +864,7 @@ object KafkaConfig {
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
.define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, AlterConfigPolicyClassNameDoc)
+ .define(LogMessageDownConversionEnableProp, BOOLEAN, Defaults.MessageDownConversionEnable, LOW, LogMessageDownConversionEnableDoc)
/** ********* Replication configuration ***********/
.define(ControllerSocketTimeoutMsProp, INT, Defaults.ControllerSocketTimeoutMs, MEDIUM, ControllerSocketTimeoutMsDoc)
@@ -1134,6 +1138,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
def logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
def logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
+ def logMessageDownConversionEnable: Boolean = getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 57bca69..f73ede6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -79,6 +79,7 @@ object KafkaServer {
logProps.put(LogConfig.MessageFormatVersionProp, kafkaConfig.logMessageFormatVersion.version)
logProps.put(LogConfig.MessageTimestampTypeProp, kafkaConfig.logMessageTimestampType.name)
logProps.put(LogConfig.MessageTimestampDifferenceMaxMsProp, kafkaConfig.logMessageTimestampDifferenceMaxMs: java.lang.Long)
+ logProps.put(LogConfig.MessageDownConversionEnableProp, kafkaConfig.logMessageDownConversionEnable: java.lang.Boolean)
logProps
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 24f3235..965595b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,7 @@ import com.yammer.metrics.core.Gauge
import kafka.api._
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.controller.{KafkaController, StateChangeLogger}
-import kafka.log.{Log, LogAppendInfo, LogManager}
+import kafka.log.{Log, LogAppendInfo, LogConfig, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.OffsetCheckpointFile
@@ -995,8 +995,9 @@ class ReplicaManager(val config: KafkaConfig,
quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
}
- def getMagic(topicPartition: TopicPartition): Option[Byte] =
- getReplica(topicPartition).flatMap(_.log.map(_.config.messageFormatVersion.recordVersion.value))
+ def getLogConfig(topicPartition: TopicPartition): Option[LogConfig] = getReplica(topicPartition).flatMap(_.log.map(_.config))
+
+ def getMagic(topicPartition: TopicPartition): Option[Byte] = getLogConfig(topicPartition).map(_.messageFormatVersion.recordVersion.value)
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 45b3fdc..69ca317 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -369,6 +369,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.LogPreAllocateProp, true.toString)
props.put(KafkaConfig.LogMessageTimestampTypeProp, TimestampType.LOG_APPEND_TIME.toString)
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp, "1000")
+ props.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogSegmentBytesProp, "4000"))
// Verify that all broker defaults have been updated
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
new file mode 100644
index 0000000..e5ef985
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestDownConversionConfigTest.scala
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.Assert._
+import org.junit.Test
+
+class FetchRequestDownConversionConfigTest extends BaseRequestTest {
+ private var producer: KafkaProducer[String, String] = null
+ override def numBrokers: Int = 1
+
+ override def setUp(): Unit = {
+ super.setUp()
+ initProducer()
+ }
+
+ override def tearDown(): Unit = {
+ if (producer != null)
+ producer.close()
+ super.tearDown()
+ }
+
+ override protected def propertyOverrides(properties: Properties): Unit = {
+ super.propertyOverrides(properties)
+ properties.put(KafkaConfig.LogMessageDownConversionEnableProp, "false")
+ }
+
+ private def initProducer(): Unit = {
+ producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+ retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer)
+ }
+
+ private def createTopics(numTopics: Int, numPartitions: Int,
+ configs: Map[String, String] = Map.empty, topicSuffixStart: Int = 0): Map[TopicPartition, Int] = {
+ val topics = (0 until numTopics).map(t => s"topic${t + topicSuffixStart}")
+ val topicConfig = new Properties
+ topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 1.toString)
+ configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
+ topics.flatMap { topic =>
+ val partitionToLeader = createTopic(topic, numPartitions = numPartitions, replicationFactor = 1,
+ topicConfig = topicConfig)
+ partitionToLeader.map { case (partition, leader) => new TopicPartition(topic, partition) -> leader }
+ }.toMap
+ }
+
+ private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition],
+ offsetMap: Map[TopicPartition, Long] = Map.empty): util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] = {
+ val partitionMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
+ topicPartitions.foreach { tp =>
+ partitionMap.put(tp, new FetchRequest.PartitionData(offsetMap.getOrElse(tp, 0), 0L, maxPartitionBytes))
+ }
+ partitionMap
+ }
+
+ private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse[MemoryRecords] = {
+ val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId))
+ FetchResponse.parse(response, request.version)
+ }
+
+ /**
+ * Tests that fetch request that require down-conversion returns with an error response when down-conversion is disabled on broker.
+ */
+ @Test
+ def testV1FetchWithDownConversionDisabled(): Unit = {
+ val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+ val topicPartitions = topicMap.keySet.toSeq
+ topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
+ topicPartitions)).build(1)
+ val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+ topicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error))
+ }
+
+ /**
+ * Tests that "message.downconversion.enable" has no effect when down-conversion is not required.
+ */
+ @Test
+ def testLatestFetchWithDownConversionDisabled(): Unit = {
+ val topicMap = createTopics(numTopics = 5, numPartitions = 1)
+ val topicPartitions = topicMap.keySet.toSeq
+ topicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
+ topicPartitions)).build()
+ val fetchResponse = sendFetchRequest(topicMap.head._2, fetchRequest)
+ topicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error))
+ }
+
+ /**
+ * Tests that "message.downconversion.enable" can be set at topic level, and its configuration is obeyed for client
+ * fetch requests.
+ */
+ @Test
+ def testV1FetchWithTopicLevelOverrides(): Unit = {
+ // create topics with default down-conversion configuration (i.e. conversion disabled)
+ val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0)
+ val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq
+
+ // create topics with down-conversion configuration enabled
+ val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+ val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5)
+ val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq
+
+ val allTopics = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions
+ val leaderId = conversionDisabledTopicsMap.head._2
+
+ allTopics.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024,
+ allTopics)).build(1)
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+ conversionDisabledTopicPartitions.foreach(tp => assertEquals(Errors.UNSUPPORTED_VERSION, fetchResponse.responseData().get(tp).error))
+ conversionEnabledTopicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error))
+ }
+
+ /**
+ * Tests that "message.downconversion.enable" has no effect on fetch requests from replicas.
+ */
+ @Test
+ def testV1FetchFromReplica(): Unit = {
+ // create topics with default down-conversion configuration (i.e. conversion disabled)
+ val conversionDisabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicSuffixStart = 0)
+ val conversionDisabledTopicPartitions = conversionDisabledTopicsMap.keySet.toSeq
+
+ // create topics with down-conversion configuration enabled
+ val topicConfig = Map(LogConfig.MessageDownConversionEnableProp -> "true")
+ val conversionEnabledTopicsMap = createTopics(numTopics = 5, numPartitions = 1, topicConfig, topicSuffixStart = 5)
+ val conversionEnabledTopicPartitions = conversionEnabledTopicsMap.keySet.toSeq
+
+ val allTopicPartitions = conversionDisabledTopicPartitions ++ conversionEnabledTopicPartitions
+ val leaderId = conversionDisabledTopicsMap.head._2
+
+ allTopicPartitions.foreach(tp => producer.send(new ProducerRecord(tp.topic(), "key", "value")).get())
+ val fetchRequest = FetchRequest.Builder.forReplica(1, 1, Int.MaxValue, 0,
+ createPartitionMap(1024, allTopicPartitions)).build()
+ val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+
+ allTopicPartitions.foreach(tp => assertEquals(Errors.NONE, fetchResponse.responseData().get(tp).error))
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 63e23b2..06ff2d9 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -426,7 +426,7 @@ class FetchRequestTest extends BaseRequestTest {
}
private def createTopics(numTopics: Int, numPartitions: Int, configs: Map[String, String] = Map.empty): Map[TopicPartition, Int] = {
- val topics = (0 until numPartitions).map(t => s"topic$t")
+ val topics = (0 until numTopics).map(t => s"topic$t")
val topicConfig = new Properties
topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.