You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/01 14:12:55 UTC

[kafka] branch trunk updated: KAFKA-6973: Validate topic config message.timestamp.type (#5106)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0120e88  KAFKA-6973: Validate topic config message.timestamp.type (#5106)
0120e88 is described below

commit 0120e88e2cab1c02cac471ab4acc76ce876bf72b
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Jun 1 22:12:42 2018 +0800

    KAFKA-6973: Validate topic config message.timestamp.type (#5106)
    
    Specifying an invalid config (i.e. something other than `CreateTime` or
    `LogAppendTime`) via `TopicCommand` would previously cause the
    broker to fail on start-up.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/LogConfig.scala           |  2 +-
 .../test/scala/unit/kafka/admin/TopicCommandTest.scala  | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 0db49e7..158209a 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -254,7 +254,7 @@ object LogConfig {
         KafkaConfig.LogPreAllocateProp)
       .define(MessageFormatVersionProp, STRING, Defaults.MessageFormatVersion, MEDIUM, MessageFormatVersionDoc,
         KafkaConfig.LogMessageFormatVersionProp)
-      .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, MEDIUM, MessageTimestampTypeDoc,
+      .define(MessageTimestampTypeProp, STRING, Defaults.MessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, MessageTimestampTypeDoc,
         KafkaConfig.LogMessageTimestampTypeProp)
       .define(MessageTimestampDifferenceMaxMsProp, LONG, Defaults.MessageTimestampDifferenceMaxMs,
         atLeast(0), MEDIUM, MessageTimestampDifferenceMaxMsDoc, KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 6a276df..782fcf5 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -225,4 +225,21 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
     assertTrue(output.contains(topic) && output.contains(markedForDeletionList))
   }
 
+  @Test
+  def testInvalidTopicLevelConfig(): Unit = {
+    val brokers = List(0)
+    TestUtils.createBrokersInZk(zkClient, brokers)
+
+    // create the topic
+    try {
+      val createOpts = new TopicCommandOptions(
+        Array("--partitions", "1", "--replication-factor", "1", "--topic", "test",
+          "--config", "message.timestamp.type=boom"))
+      TopicCommand.createTopic(zkClient, createOpts)
+      fail("Expected exception on invalid topic-level config.")
+    } catch {
+      case _: Exception => // topic creation should fail due to the invalid config
+    }
+  }
+
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.