You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/30 14:53:07 UTC

kafka git commit: KAFKA-5344; set message.timestamp.difference.max.ms back to Long.MaxValue

Repository: kafka
Updated Branches:
  refs/heads/trunk 6f5930d63 -> 6b0349791


KAFKA-5344; set message.timestamp.difference.max.ms back to Long.MaxValue

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3163 from becketqin/KAFKA-5344


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b034979
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b034979
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b034979

Branch: refs/heads/trunk
Commit: 6b03497915665bb4823073a5a34b03be709eb287
Parents: 6f5930d
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue May 30 15:38:04 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 30 15:44:34 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaConfig.scala  | 16 ++++------------
 .../scala/unit/kafka/server/KafkaConfigTest.scala   |  1 -
 docs/upgrade.html                                   |  2 --
 3 files changed, 4 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index de036a7..6e94043 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -106,7 +106,7 @@ object Defaults {
   // lazy val as `InterBrokerProtocolVersion` is defined later
   lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
   val LogMessageTimestampType = "CreateTime"
-  val LogMessageTimestampDifferenceMaxMs = LogRetentionHours * 60 * 60 * 1000L
+  val LogMessageTimestampDifferenceMaxMs = Long.MaxValue
   val NumRecoveryThreadsPerDataDir = 1
   val AutoCreateTopicsEnable = true
   val MinInSyncReplicas = 1
@@ -514,8 +514,7 @@ object KafkaConfig {
   val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
     "a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " +
     "if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." +
-    "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling. For " +
-    "this reason, the default is the value of log.retention.ms."
+    "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling."
   val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
   val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
   val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +
@@ -747,7 +746,7 @@ object KafkaConfig {
       .define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
       .define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
       .define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
-      .define(LogMessageTimestampDifferenceMaxMsProp, LONG, null, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
+      .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
       .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
 
       /** ********* Replication configuration ***********/
@@ -959,7 +958,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
   val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
   val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
-  val logMessageTimestampDifferenceMaxMs = getMessageTimestampDifferenceMaxMs
+  val logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
 
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -1086,13 +1085,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
     millis
   }
 
-  private def getMessageTimestampDifferenceMaxMs: Long = {
-    Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match {
-      case Some(value) => value
-      case None => getLogRetentionTimeMillis
-    }
-  }
-
   private def getMap(propName: String, propValue: String): Map[String, String] = {
     try {
       CoreUtils.parseCsvMap(propValue)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bf89533..df8a6d7 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -712,7 +712,6 @@ class KafkaConfigTest {
     assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
     assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
     assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
-    assertEquals(config.logRetentionTimeMillis, config.logMessageTimestampDifferenceMaxMs)
     assertEquals(123L, config.logFlushIntervalMs)
     assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
     assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dab5fa7..2b62a2b 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -60,8 +60,6 @@
     <li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal
         auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
         replication factor requirement.</li>
-    <li>By default <code>message.timestamp.difference.max.ms</code> is the same as <code>retention.ms</code> instead of
-        <code>Long.MAX_VALUE</code>.</li>
     <li>The broker configuration <code>max.message.bytes</code> now applies to the total size of a batch of messages.
         Previously the setting applied to batches of compressed messages, or to non-compressed messages individually. In practice,
         the change is minor since a message batch may consist of only a single message, so the limitation on the size of