You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/30 14:53:07 UTC
kafka git commit: KAFKA-5344;
set message.timestamp.difference.max.ms back to Long.MaxValue
Repository: kafka
Updated Branches:
refs/heads/trunk 6f5930d63 -> 6b0349791
KAFKA-5344; set message.timestamp.difference.max.ms back to Long.MaxValue
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3163 from becketqin/KAFKA-5344
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b034979
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b034979
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b034979
Branch: refs/heads/trunk
Commit: 6b03497915665bb4823073a5a34b03be709eb287
Parents: 6f5930d
Author: Jiangjie Qin <be...@gmail.com>
Authored: Tue May 30 15:38:04 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue May 30 15:44:34 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaConfig.scala | 16 ++++------------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 -
docs/upgrade.html | 2 --
3 files changed, 4 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index de036a7..6e94043 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -106,7 +106,7 @@ object Defaults {
// lazy val as `InterBrokerProtocolVersion` is defined later
lazy val LogMessageFormatVersion = InterBrokerProtocolVersion
val LogMessageTimestampType = "CreateTime"
- val LogMessageTimestampDifferenceMaxMs = LogRetentionHours * 60 * 60 * 1000L
+ val LogMessageTimestampDifferenceMaxMs = Long.MaxValue
val NumRecoveryThreadsPerDataDir = 1
val AutoCreateTopicsEnable = true
val MinInSyncReplicas = 1
@@ -514,8 +514,7 @@ object KafkaConfig {
val LogMessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " +
"a message and the timestamp specified in the message. If log.message.timestamp.type=CreateTime, a message will be rejected " +
"if the difference in timestamp exceeds this threshold. This configuration is ignored if log.message.timestamp.type=LogAppendTime." +
- "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling. For " +
- "this reason, the default is the value of log.retention.ms."
+ "The maximum timestamp difference allowed should be no greater than log.retention.ms to avoid unnecessarily frequent log rolling."
val NumRecoveryThreadsPerDataDirDoc = "The number of threads per data directory to be used for log recovery at startup and flushing at shutdown"
val AutoCreateTopicsEnableDoc = "Enable auto creation of topic on the server"
val MinInSyncReplicasDoc = "When a producer sets acks to \"all\" (or \"-1\"), " +
@@ -747,7 +746,7 @@ object KafkaConfig {
.define(MinInSyncReplicasProp, INT, Defaults.MinInSyncReplicas, atLeast(1), HIGH, MinInSyncReplicasDoc)
.define(LogMessageFormatVersionProp, STRING, Defaults.LogMessageFormatVersion, MEDIUM, LogMessageFormatVersionDoc)
.define(LogMessageTimestampTypeProp, STRING, Defaults.LogMessageTimestampType, in("CreateTime", "LogAppendTime"), MEDIUM, LogMessageTimestampTypeDoc)
- .define(LogMessageTimestampDifferenceMaxMsProp, LONG, null, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
+ .define(LogMessageTimestampDifferenceMaxMsProp, LONG, Defaults.LogMessageTimestampDifferenceMaxMs, MEDIUM, LogMessageTimestampDifferenceMaxMsDoc)
.define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, CreateTopicPolicyClassNameDoc)
/** ********* Replication configuration ***********/
@@ -959,7 +958,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val logMessageFormatVersionString = getString(KafkaConfig.LogMessageFormatVersionProp)
val logMessageFormatVersion = ApiVersion(logMessageFormatVersionString)
val logMessageTimestampType = TimestampType.forName(getString(KafkaConfig.LogMessageTimestampTypeProp))
- val logMessageTimestampDifferenceMaxMs = getMessageTimestampDifferenceMaxMs
+ val logMessageTimestampDifferenceMaxMs: Long = getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int = getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
@@ -1086,13 +1085,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
millis
}
- private def getMessageTimestampDifferenceMaxMs: Long = {
- Option(getLong(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp)) match {
- case Some(value) => value
- case None => getLogRetentionTimeMillis
- }
- }
-
private def getMap(propName: String, propValue: String): Map[String, String] = {
try {
CoreUtils.parseCsvMap(propValue)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bf89533..df8a6d7 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -712,7 +712,6 @@ class KafkaConfigTest {
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
- assertEquals(config.logRetentionTimeMillis, config.logMessageTimestampDifferenceMaxMs)
assertEquals(123L, config.logFlushIntervalMs)
assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
assertEquals(Sensor.RecordingLevel.DEBUG.toString, config.metricRecordingLevel)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b034979/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dab5fa7..2b62a2b 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -60,8 +60,6 @@
<li>The <code>offsets.topic.replication.factor</code> broker config is now enforced upon auto topic creation. Internal
auto topic creation will fail with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets this
replication factor requirement.</li>
- <li>By default <code>message.timestamp.difference.max.ms</code> is the same as <code>retention.ms</code> instead of
- <code>Long.MAX_VALUE</code>.</li>
<li>The broker configuration <code>max.message.bytes</code> now applies to the total size of a batch of messages.
Previously the setting applied to batches of compressed messages, or to non-compressed messages individually. In practice,
the change is minor since a message batch may consist of only a single message, so the limitation on the size of