You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2018/10/05 22:23:47 UTC

samza git commit: SAMZA-1929: Automatically adjust changelog topic level settings, when RocksDB TTL is set

Repository: samza
Updated Branches:
  refs/heads/master 531b35e9f -> aac736060


SAMZA-1929: Automatically adjust changelog topic level settings, when RocksDB TTL is set

When TTL is set on RocksDB, the default usage of log compacted topic for changelog isn't appropriate. Ideally, it should match the behavior with a TTL.

So we would adjust changelog topic setting, when
1) TTL is set for RocksDB and
2) Changelog is enabled
3) User has not set them

And we would
1) Disable log compaction on Kafka changelog topic
2) Set topic TTL to be slightly larger than RocksDB TTL

User should be able to override this behavior through TableDescriptor.withConfig()

Author: Wei Song <ws...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #682 from weisong44/SAMZA-1929 and squashes the following commits:

bedfd280 [Wei Song] Merge branch 'master' into SAMZA-1929
097958c8 [Wei Song] Merge remote-tracking branch 'upstream/master'
9c35a0e1 [Wei Song] Updated comments to indicate cleanup.policy=compact,delete is available in 0.11.1.57 and later, for now use delete
9c7410cc [Wei Song] Merge branch 'master' into SAMZA-1929
05822f0a [Wei Song] Merge remote-tracking branch 'upstream/master'
dbda1c17 [Wei Song] SAMZA-1929: Automatically adjust changelog topic level settings, when RocksDB TTL is set
f7480505 [Wei Song] Merge remote-tracking branch 'upstream/master'
7706ab1f [Wei Song] Merge remote-tracking branch 'upstream/master'
f5731b10 [Wei Song] Merge remote-tracking branch 'upstream/master'
1e5de45a [Wei Song] Merge remote-tracking branch 'upstream/master'
c85604e0 [Wei Song] Merge remote-tracking branch 'upstream/master'
242d8442 [Wei Song] Merge remote-tracking branch 'upstream/master'
ec7d8409 [Wei Song] Merge remote-tracking branch 'upstream/master'
e19b4dc9 [Wei Song] Merge remote-tracking branch 'upstream/master'
8ee78441 [Wei Song] Merge remote-tracking branch 'upstream/master'
1c6a2eae [Wei Song] Merge remote-tracking branch 'upstream/master'
a6c94add [Wei Song] Merge remote-tracking branch 'upstream/master'
41299b5b [Wei Song] Merge remote-tracking branch 'upstream/master'
239a0950 [Wei Song] Merge remote-tracking branch 'upstream/master'
eca00204 [Wei Song] Merge remote-tracking branch 'upstream/master'
51562391 [Wei Song] Merge remote-tracking branch 'upstream/master'
de708f5e [Wei Song] Merge remote-tracking branch 'upstream/master'
df2f8d7b [Wei Song] Merge remote-tracking branch 'upstream/master'
f28b491d [Wei Song] Merge remote-tracking branch 'upstream/master'
4782c61d [Wei Song] Merge remote-tracking branch 'upstream/master'
0440f75f [Wei Song] Merge remote-tracking branch 'upstream/master'
aae0f380 [Wei Song] Merge remote-tracking branch 'upstream/master'
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/aac73606
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/aac73606
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/aac73606

Branch: refs/heads/master
Commit: aac73606096f39eaa6041d82617f114bad0df515
Parents: 531b35e
Author: Wei Song <ws...@linkedin.com>
Authored: Fri Oct 5 15:23:37 2018 -0700
Committer: Wei Song <ws...@linkedin.com>
Committed: Fri Oct 5 15:23:37 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/KafkaConfig.scala   | 25 +++++++++++++-------
 .../apache/samza/config/TestKafkaConfig.scala   | 18 ++++++++++++--
 2 files changed, 33 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/aac73606/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index ef43e72..e5cca36 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -251,15 +251,24 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val kafkaChangeLogProperties = new Properties
 
     val appConfig = new ApplicationConfig(config)
-    // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.3,
+    // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
     // 1.0.2, or 1.1.0 (see KAFKA-6568)
-    // if (appConfig.getAppMode == ApplicationMode.STREAM) {
-    //  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
-    // } else{
-    //  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
-    //  kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
-    // }
-    kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+
+    // Adjust changelog topic setting, when TTL is set on a RocksDB store
+    //  - Disable log compaction on Kafka changelog topic
+    //  - Set topic TTL to be the same as RocksDB TTL
+    Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+      case Some(rocksDbTtl) =>
+        if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
+          kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
+          if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
+            kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
+          }
+        }
+      case _ =>
+        kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+    }
+
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
     filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }

http://git-wip-us.apache.org/repos/asf/samza/blob/aac73606/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index a4fe686..b8467b8 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -120,6 +120,11 @@ class TestKafkaConfig {
     props.setProperty("job.changelog.system", "kafka")
     props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
+    props.setProperty("stores.test4.rocksdb.ttl.ms", "3600")
+    props.setProperty("stores.test5.rocksdb.ttl.ms", "3600")
+    props.setProperty("stores.test5.changelog.kafka.retention.ms", "1000")
+    props.setProperty("stores.test6.rocksdb.ttl.ms", "3600")
+    props.setProperty("stores.test6.changelog.kafka.cleanup.policy", "compact")
 
     val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
@@ -130,15 +135,24 @@ class TestKafkaConfig {
     assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse(""))
     assertEquals("mychangelog2", storeToChangelog.get("test2").getOrElse(""))
     assertEquals("otherstream", storeToChangelog.get("test3").getOrElse(""))
+    assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
+    assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
 
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
-    val mapConfig1 = new MapConfig(props.asScala.asJava)
-    val kafkaConfig1 = new KafkaConfig(mapConfig)
     val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
     assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse(""))
     assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse(""))
     assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse(""))
 
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("cleanup.policy"), "delete")
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("retention.ms"), "3600")
+
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("cleanup.policy"), "delete")
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("retention.ms"), "1000")
+
+    assertEquals(kafkaConfig.getChangelogKafkaProperties("test6").getProperty("cleanup.policy"), "compact")
+    assertNull(kafkaConfig.getChangelogKafkaProperties("test6").getProperty("retention.ms"))
+
     props.setProperty(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name())
     val batchMapConfig = new MapConfig(props.asScala.asJava)
     val batchKafkaConfig = new KafkaConfig(batchMapConfig)