You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/03/06 03:40:48 UTC
samza git commit: SAMZA-1600: remove the combination of cleanup policy "compact,delete"…
Repository: samza
Updated Branches:
refs/heads/master 810d8bd80 -> 78ee98261
SAMZA-1600: remove the combination of cleanup policy "compact,delete"…
… in changelog topic properties
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #435 from nickpan47/SAMZA-1600
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/78ee9826
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/78ee9826
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/78ee9826
Branch: refs/heads/master
Commit: 78ee98261debd13a5f2a7fb2ea6408c63f9c4c66
Parents: 810d8bd
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Mon Mar 5 19:40:45 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon Mar 5 19:40:45 2018 -0800
----------------------------------------------------------------------
.../scala/org/apache/samza/config/KafkaConfig.scala | 15 +++++++++------
.../org/apache/samza/config/TestKafkaConfig.scala | 10 +++++-----
2 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/78ee9826/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 1c1cdbd..124c85a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -250,12 +250,15 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
val kafkaChangeLogProperties = new Properties
val appConfig = new ApplicationConfig(config)
- if (appConfig.getAppMode == ApplicationMode.STREAM) {
- kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
- } else{
- kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
- kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
- }
+ // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.3,
+ // 1.0.2, or 1.1.0 (see KAFKA-6568)
+ // if (appConfig.getAppMode == ApplicationMode.STREAM) {
+ // kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+ // } else{
+ // kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
+ // kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+ // }
+ kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
http://git-wip-us.apache.org/repos/asf/samza/blob/78ee9826/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 19b2cc6..a4fe686 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -143,13 +143,13 @@ class TestKafkaConfig {
val batchMapConfig = new MapConfig(props.asScala.asJava)
val batchKafkaConfig = new KafkaConfig(batchMapConfig)
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
- assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"),
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("delete.retention.ms"),
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
- assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact,delete")
- assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"),
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("delete.retention.ms"),
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
- assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact,delete")
- assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("retention.ms"),
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("delete.retention.ms"),
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
}