You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/28 05:39:59 UTC
[kafka] branch trunk updated: KAFKA-7409;
Validate message format version before creating topics or altering
configs (#5651)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70d90c3 KAFKA-7409; Validate message format version before creating topics or altering configs (#5651)
70d90c3 is described below
commit 70d90c371833b09cf934c8c2358171433892a085
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Sep 28 13:39:45 2018 +0800
KAFKA-7409; Validate message format version before creating topics or altering configs (#5651)
Values for `message.format.version` and `log.message.format.version` should be verified before topic creation or config change.
Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
core/src/main/scala/kafka/api/ApiVersion.scala | 13 +++++++++++++
core/src/main/scala/kafka/log/LogConfig.scala | 4 ++--
core/src/main/scala/kafka/server/AdminManager.scala | 5 ++++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 +++---
core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala | 12 ++++++++++++
.../scala/unit/kafka/server/CreateTopicsRequestTest.scala | 6 ++++++
6 files changed, 40 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index b145adf..bc3602b 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,6 +17,8 @@
package kafka.api
+import org.apache.kafka.common.config.ConfigDef.Validator
+import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.record.RecordVersion
/**
@@ -267,3 +269,14 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion {
val recordVersion = RecordVersion.V2
val id: Int = 18
}
+
+object ApiVersionValidator extends Validator {
+
+ override def ensureValid(name: String, value: Any): Unit = {
+ try {
+ ApiVersion(value.toString)
+ } catch {
+ case e: IllegalArgumentException => throw new ConfigException(name, value.toString, e.getMessage)
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index bd4768e..d872e09 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -20,7 +20,7 @@ package kafka.log
import java.util.{Collections, Locale, Properties}
import scala.collection.JavaConverters._
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, ApiVersionValidator}
import kafka.message.BrokerCompressionCodec
import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
import kafka.utils.Implicits._
@@ -256,7 +256,7 @@ object LogConfig {
MEDIUM, CompressionTypeDoc, KafkaConfig.CompressionTypeProp)
.define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc,
KafkaConfig.LogPreAllocateProp)
- .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc,
+ .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, ApiVersionValidator, MEDIUM, MessageFormatVersionDoc,
KafkaConfig.LogMessageFormatVersionProp)
.define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc,
KafkaConfig.LogMessageTimestampTypeProp)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index e9598e3..f765f51 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -26,7 +26,7 @@ import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
@@ -131,6 +131,9 @@ class AdminManager(val config: KafkaConfig,
case e: ApiException =>
info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
+ case e: ConfigException =>
+ info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
+ CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
case e: Throwable =>
error(s"Error processing create topic request for topic $topic with arguments $arguments", e)
CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9022502..700f32c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,7 +20,7 @@ package kafka.server
import java.util
import java.util.{Collections, Properties}
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
+import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1}
import kafka.cluster.EndPoint
import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
@@ -890,7 +890,7 @@ object KafkaConfig {
.define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
.define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
- .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
+ .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, ApiVersionValidator, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
.define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
@@ -918,7 +918,7 @@ object KafkaConfig {
.define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
.define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
- .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
+ .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
.define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
/** ********* Controlled shutdown configuration ***********/
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 13f23e6..a469f8e 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -26,6 +26,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils.getDeleteTopicPath
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.config.ConfigException
class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@@ -240,6 +241,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
} catch {
case _: Exception => // topic creation should fail due to the invalid config
}
+
+ // try to create the topic with another invalid config
+ try {
+ val createOpts = new TopicCommandOptions(
+ Array("--partitions", "1", "--replication-factor", "1", "--topic", "test",
+ "--config", "message.format.version=boom"))
+ TopicCommand.createTopic(zkClient, createOpts)
+ fail("Expected exception on invalid topic-level config.")
+ } catch {
+ case _: ConfigException => // topic creation should fail due to the invalid config value
+ }
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 13a2d23..47c1765 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -71,6 +71,12 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
val invalidConfig = Map("not.a.property" -> "error").asJava
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
+
+ val config = Map("message.format.version" -> "invalid-value").asJava
+ validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
+ Map("error-config-value" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, config)).asJava, timeout).build(),
+ Map("error-config-value" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
+
val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)