You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/06 03:40:48 UTC

samza git commit: SAMZA-1600: remove the combination of cleanup policy "compact,delete"…

Repository: samza
Updated Branches:
  refs/heads/master 810d8bd80 -> 78ee98261


SAMZA-1600: remove the combination of cleanup policy "compact,delete"…

… in changelog topic properties

Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #435 from nickpan47/SAMZA-1600


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/78ee9826
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/78ee9826
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/78ee9826

Branch: refs/heads/master
Commit: 78ee98261debd13a5f2a7fb2ea6408c63f9c4c66
Parents: 810d8bd
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Mar 5 19:40:45 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Mar 5 19:40:45 2018 -0800

----------------------------------------------------------------------
 .../scala/org/apache/samza/config/KafkaConfig.scala  | 15 +++++++++------
 .../org/apache/samza/config/TestKafkaConfig.scala    | 10 +++++-----
 2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/78ee9826/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 1c1cdbd..124c85a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -250,12 +250,15 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val kafkaChangeLogProperties = new Properties
 
     val appConfig = new ApplicationConfig(config)
-    if (appConfig.getAppMode == ApplicationMode.STREAM) {
-      kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
-    } else{
-      kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
-      kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
-    }
+    // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.3,
+    // 1.0.2, or 1.1.0 (see KAFKA-6568)
+    // if (appConfig.getAppMode == ApplicationMode.STREAM) {
+    //  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+    // } else{
+    //  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
+    //  kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+    // }
+    kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
     filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }

http://git-wip-us.apache.org/repos/asf/samza/blob/78ee9826/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 19b2cc6..a4fe686 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -143,13 +143,13 @@ class TestKafkaConfig {
     val batchMapConfig = new MapConfig(props.asScala.asJava)
     val batchKafkaConfig = new KafkaConfig(batchMapConfig)
     assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
-    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"),
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("delete.retention.ms"),
       String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
-    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact,delete")
-    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"),
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("delete.retention.ms"),
       String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
-    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact,delete")
-    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("retention.ms"),
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("delete.retention.ms"),
       String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
   }