You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/27 20:02:56 UTC

kafka git commit: KAFKA-4092: retention.bytes should not be allowed to be less than segment.bytes

Repository: kafka
Updated Branches:
  refs/heads/trunk d09214624 -> 4ca5abe8e


KAFKA-4092: retention.bytes should not be allowed to be less than segment.bytes

adding a LogConfig value validator.  gwenshap or junrao would you mind taking a look?

Author: Dustin Cote <du...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1796 from cotedm/retentionbytesvalidation


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ca5abe8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ca5abe8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ca5abe8

Branch: refs/heads/trunk
Commit: 4ca5abe8ee7578f602fb7653cb8a09640607ea85
Parents: d092146
Author: Dustin Cote <du...@confluent.io>
Authored: Tue Dec 27 15:02:07 2016 -0500
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Dec 27 15:02:22 2016 -0500

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogConfig.scala          | 11 +++++++++++
 core/src/test/scala/unit/kafka/log/LogConfigTest.scala | 10 ++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4ca5abe8/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 55669c0..eaa6016 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -324,11 +324,22 @@ object LogConfig {
   }
 
   /**
+    * Check that the property values are valid relative to each other
+    */
+  def validateValues(props: Properties) {
+    val segmentBytes = if (props.getProperty(SegmentBytesProp) == null) Defaults.SegmentSize else props.getProperty(SegmentBytesProp).toLong
+    val retentionBytes = if (props.getProperty(RetentionBytesProp) == null) Defaults.RetentionSize else props.getProperty(RetentionBytesProp).toLong
+    if (segmentBytes > retentionBytes && retentionBytes != -1)
+      throw new InvalidConfigurationException(s"segment.bytes ${segmentBytes} is not less than or equal to retention.bytes ${retentionBytes}")
+  }
+
+  /**
    * Check that the given properties contain only valid log config names and that all values can be parsed and are valid
    */
   def validate(props: Properties) {
     validateNames(props)
     configDef.parse(props)
+    validateValues(props)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ca5abe8/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
index e79ceff..ccb1e60 100644
--- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala
@@ -111,6 +111,16 @@ class LogConfigTest {
       case _: ConfigException => false
     }
   }
+  def testValueValidator() {
+    val p = new Properties()
+    p.setProperty(LogConfig.SegmentBytesProp, "100")
+    p.setProperty(LogConfig.RetentionBytesProp, "100")
+    LogConfig.validate(p)
+    p.setProperty(LogConfig.RetentionBytesProp, "90")
+    val except = intercept[IllegalArgumentException] {
+      LogConfig.validate(p)
+    }
+  }
 
   private def assertPropertyInvalid(name: String, values: AnyRef*) {
     values.foreach((value) => {