You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2020/02/06 23:31:26 UTC

[samza] 04/06: Fix the RocksDB TTL type conversion in change log properties generation. (#1254)

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit cccca7cf137ee168eed07238f5deeb0f60d9710d
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Fri Jan 17 10:19:34 2020 -0800

    Fix the RocksDB TTL type conversion in change log properties generation. (#1254)
---
 .../scala/org/apache/samza/config/KafkaConfig.scala   |  2 +-
 .../org/apache/samza/config/TestKafkaConfig.scala     | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 3b5f5f3..69a9966 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -324,7 +324,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     //  - Set topic TTL to be the same as RocksDB TTL
     Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
       case Some(rocksDbTtl) =>
-        if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) {
+        if (!rocksDbTtl.isEmpty && rocksDbTtl.toLong < 0) {
           kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
           kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
         } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 00b103d..64b476b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -105,6 +105,25 @@ class TestKafkaConfig {
   }
 
   @Test
+  def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForLargeTTLStores(): Unit = {
+    val props = new Properties
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+    props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+    props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+    // Set the RocksDB TTL to be 28 days.
+    props.setProperty("stores.test1.rocksdb.ttl.ms", "2419200000")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+    assertEquals("delete", kafkaProperties.getProperty("cleanup.policy"))
+    assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+    assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+  }
+
+  @Test
   def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
     val props = new Properties
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")