You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2020/02/06 23:31:26 UTC
[samza] 04/06: Fix the RocksDB TTL type conversion in change log
properties generation. (#1254)
This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git
commit cccca7cf137ee168eed07238f5deeb0f60d9710d
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Fri Jan 17 10:19:34 2020 -0800
Fix the RocksDB TTL type conversion in change log properties generation. (#1254)
---
.../scala/org/apache/samza/config/KafkaConfig.scala | 2 +-
.../org/apache/samza/config/TestKafkaConfig.scala | 19 +++++++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 3b5f5f3..69a9966 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -324,7 +324,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// - Set topic TTL to be the same as RocksDB TTL
Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
case Some(rocksDbTtl) =>
- if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) {
+ if (!rocksDbTtl.isEmpty && rocksDbTtl.toLong < 0) {
kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
} else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 00b103d..64b476b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -105,6 +105,25 @@ class TestKafkaConfig {
}
@Test
+ def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForLargeTTLStores(): Unit = {
+ val props = new Properties
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+ props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+ props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+ props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+ // Set the RocksDB TTL to be 28 days.
+ props.setProperty("stores.test1.rocksdb.ttl.ms", "2419200000")
+
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+ assertEquals("delete", kafkaProperties.getProperty("cleanup.policy"))
+ assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+ assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+ }
+
+ @Test
def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
val props = new Properties
props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")