You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/28 05:39:59 UTC

[kafka] branch trunk updated: KAFKA-7409; Validate message format version before creating topics or altering configs (#5651)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70d90c3  KAFKA-7409; Validate message format version before creating topics or altering configs (#5651)
70d90c3 is described below

commit 70d90c371833b09cf934c8c2358171433892a085
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Sep 28 13:39:45 2018 +0800

    KAFKA-7409; Validate message format version before creating topics or altering configs (#5651)
    
    Values for `message.format.version` and `log.message.format.version` should be verified before topic creation or config change.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/api/ApiVersion.scala              | 13 +++++++++++++
 core/src/main/scala/kafka/log/LogConfig.scala               |  4 ++--
 core/src/main/scala/kafka/server/AdminManager.scala         |  5 ++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala          |  6 +++---
 core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala | 12 ++++++++++++
 .../scala/unit/kafka/server/CreateTopicsRequestTest.scala   |  6 ++++++
 6 files changed, 40 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index b145adf..bc3602b 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,6 +17,8 @@
 
 package kafka.api
 
+import org.apache.kafka.common.config.ConfigDef.Validator
+import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.record.RecordVersion
 
 /**
@@ -267,3 +269,14 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion {
   val recordVersion = RecordVersion.V2
   val id: Int = 18
 }
+
+object ApiVersionValidator extends Validator {
+
+  override def ensureValid(name: String, value: Any): Unit = {
+    try {
+      ApiVersion(value.toString)
+    } catch {
+      case e: IllegalArgumentException => throw new ConfigException(name, value.toString, e.getMessage)
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index bd4768e..d872e09 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -20,7 +20,7 @@ package kafka.log
 import java.util.{Collections, Locale, Properties}
 
 import scala.collection.JavaConverters._
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, ApiVersionValidator}
 import kafka.message.BrokerCompressionCodec
 import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
 import kafka.utils.Implicits._
@@ -256,7 +256,7 @@ object LogConfig {
         MEDIUM, CompressionTypeDoc, KafkaConfig.CompressionTypeProp)
       .define(PreAllocateEnableProp, BOOLEAN, Defaults.PreAllocateEnable, MEDIUM, PreAllocateEnableDoc,
         KafkaConfig.LogPreAllocateProp)
-      .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc,
+      .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, ApiVersionValidator, MEDIUM, MessageFormatVersionDoc,
         KafkaConfig.LogMessageFormatVersionProp)
       .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc,
         KafkaConfig.LogMessageTimestampTypeProp)
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index e9598e3..f765f51 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -26,7 +26,7 @@ import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -131,6 +131,9 @@ class AdminManager(val config: KafkaConfig,
         case e: ApiException =>
           info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
           CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
+        case e: ConfigException =>
+          info(s"Error processing create topic request for topic $topic with arguments $arguments", e)
+          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(new InvalidConfigurationException(e.getMessage, e.getCause)))
         case e: Throwable =>
           error(s"Error processing create topic request for topic $topic with arguments $arguments", e)
           CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9022502..700f32c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -20,7 +20,7 @@ package kafka.server
 import java.util
 import java.util.{Collections, Properties}
 
-import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
+import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1}
 import kafka.cluster.EndPoint
 import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
@@ -890,7 +890,7 @@ object KafkaConfig {
       .define(NumRecoveryThreadsPerDataDirProp, INT, Defaults.NumRecoveryThreadsPerDataDir, atLeast(1), HIGH, NumRecoveryThreadsPerDataDirDoc)
       .define(AutoCreateTopicsEnableProp, BOOLEAN, Defaults.AutoCreateTopicsEnable, HIGH, AutoCreateTopicsEnableDoc)
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
-      .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
+      .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, ApiVersionValidator, MEDIUM, LogMessageFormatVersionDoc)
       .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
       .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
       .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
@@ -918,7 +918,7 @@ object KafkaConfig {
       .define(LeaderImbalanceCheckIntervalSecondsProp, LONG, Defaults.LeaderImbalanceCheckIntervalSeconds, HIGH, LeaderImbalanceCheckIntervalSecondsDoc)
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
       .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
-      .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
+      .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
       .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
 
       /** ********* Controlled shutdown configuration ***********/
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 13f23e6..a469f8e 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -26,6 +26,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils.getDeleteTopicPath
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.config.ConfigException
 
 class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
@@ -240,6 +241,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     } catch {
       case _: Exception => // topic creation should fail due to the invalid config
     }
+
+    // try to create the topic with another invalid config
+    try {
+      val createOpts = new TopicCommandOptions(
+        Array("--partitions", "1", "--replication-factor", "1", "--topic", "test",
+          "--config", "message.format.version=boom"))
+      TopicCommand.createTopic(zkClient, createOpts)
+      fail("Expected exception on invalid topic-level config.")
+    } catch {
+      case _: ConfigException => // topic creation should fail due to the invalid config value
+    }
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index 13a2d23..47c1765 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -71,6 +71,12 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
     val invalidConfig = Map("not.a.property" -> "error").asJava
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-config" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, invalidConfig)).asJava, timeout).build(),
       Map("error-config" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
+
+    val config = Map("message.format.version" -> "invalid-value").asJava
+    validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(
+      Map("error-config-value" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort, config)).asJava, timeout).build(),
+      Map("error-config-value" -> error(Errors.INVALID_CONFIG)), checkErrorMessage = false)
+
     val invalidAssignments = replicaAssignmentToJava(Map(0 -> List(0, 1), 1 -> List(0)))
     validateErrorCreateTopicsRequests(new CreateTopicsRequest.Builder(Map("error-assignment" -> new CreateTopicsRequest.TopicDetails(invalidAssignments)).asJava, timeout).build(),
       Map("error-assignment" -> error(Errors.INVALID_REPLICA_ASSIGNMENT)), checkErrorMessage = false)