You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/25 01:20:42 UTC

samza git commit: SAMZA-1457: Set retention for internal streams for Batch application

Repository: samza
Updated Branches:
  refs/heads/master cc1ca2c9d -> e6049b7dd


SAMZA-1457: Set retention for internal streams for Batch application

For intermediate streams, checkpoint and changelog, we need to set a short retention period for batch.

Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>

Reviewers: Jagadish V <vj...@gmail.com>

Closes #328 from xinyuiscool/SAMZA-1457


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6049b7d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6049b7d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6049b7d

Branch: refs/heads/master
Commit: e6049b7ddca385b5a1a363020b9d711f4a0b93b6
Parents: cc1ca2c
Author: Xinyu Liu <xi...@gmail.com>
Authored: Tue Oct 24 18:20:32 2017 -0700
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Oct 24 18:20:32 2017 -0700

----------------------------------------------------------------------
 .../samza/system/kafka/KafkaStreamSpec.java     |  9 +++
 .../kafka/KafkaCheckpointManagerFactory.scala   | 21 ++-----
 .../org/apache/samza/config/KafkaConfig.scala   | 37 +++++++++++-
 .../samza/system/kafka/KafkaSystemAdmin.scala   |  9 ++-
 .../samza/system/kafka/KafkaSystemFactory.scala | 22 ++++++-
 .../TestKafkaCheckpointManagerFactory.java      | 51 +++++++++++++++++
 .../kafka/TestKafkaSystemFactoryJava.java       | 60 ++++++++++++++++++++
 .../kafka/TestKafkaCheckpointManager.scala      |  6 +-
 .../apache/samza/config/TestKafkaConfig.scala   | 13 +++++
 9 files changed, 204 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index a49c022..de7b7b0 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -167,6 +167,15 @@ public class KafkaStreamSpec extends StreamSpec {
     return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), getProperties());
   }
 
+  /**
+   * Make a copy of the spec with new properties
+   * @param properties properties of the Kafka stream
+   * @return new instance of {@link KafkaStreamSpec}
+   */
+  public KafkaStreamSpec copyWithProperties(Properties properties) {
+    return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), getReplicationFactor(), properties);
+  }
+
   public int getReplicationFactor() {
     return replicationFactor;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 402248f..8ac347c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -21,11 +21,13 @@ package org.apache.samza.checkpoint.kafka
 
 import java.util.Properties
 
+import com.google.common.collect.ImmutableMap
 import kafka.utils.ZkUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.{Config, KafkaConfig, SystemConfig}
+import org.apache.samza.config._
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _}
@@ -38,21 +40,6 @@ object KafkaCheckpointManagerFactory {
     // on log compacted topics. Details in SAMZA-586.
     "compression.type" -> "none")
 
-  // Set the checkpoint topic configs to have a very small segment size and
-  // enable log compaction. This keeps job startup time small since there
-  // are fewer useless (overwritten) messages to read from the checkpoint
-  // topic.
-  def getCheckpointTopicProperties(config: Config) = {
-    val segmentBytes: Int = if (config == null) {
-      KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
-    } else {
-      new KafkaConfig(config).getCheckpointSegmentBytes()
-    }
-    (new Properties /: Map(
-      "cleanup.policy" -> "compact",
-      "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
-  }
-
   /**
    * Get the checkpoint system and system factory from the configuration
    * @param config
@@ -113,6 +100,6 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       connectZk,
       config.getSystemStreamPartitionGrouperFactory,      // To find out the SSPGrouperFactory class so it can be included/verified in the key
       config.failOnCheckpointValidation,
-      checkpointTopicProperties = getCheckpointTopicProperties(config))
+      checkpointTopicProperties = new KafkaConfig(config).getCheckpointTopicProperties())
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 9c33b16..1c1cdbd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -21,13 +21,16 @@ package org.apache.samza.config
 
 
 import java.util
+import java.util.concurrent.TimeUnit
 import java.util.regex.Pattern
 import java.util.{Properties, UUID}
 
+import com.google.common.collect.ImmutableMap
 import kafka.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.util.{Logging, Util}
 
@@ -76,6 +79,8 @@ object KafkaConfig {
     */
   val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
 
+  val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
+
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }
 
@@ -243,13 +248,43 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getChangelogKafkaProperties(name: String) = {
     val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
     val kafkaChangeLogProperties = new Properties
-    kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+
+    val appConfig = new ApplicationConfig(config)
+    if (appConfig.getAppMode == ApplicationMode.STREAM) {
+      kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+    } else{
+      kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
+      kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+    }
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
     filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
 
+  // Set the checkpoint topic configs to have a very small segment size and
+  // enable log compaction. This keeps job startup time small since there
+  // are fewer useless (overwritten) messages to read from the checkpoint
+  // topic.
+  def getCheckpointTopicProperties() = {
+    val segmentBytes: Int = getCheckpointSegmentBytes()
+    val appConfig = new ApplicationConfig(config)
+    val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
+    val properties = new Properties()
+
+    if (isStreamMode) {
+      properties.putAll(ImmutableMap.of(
+        "cleanup.policy", "compact",
+        "segment.bytes", String.valueOf(segmentBytes)))
+    } else {
+      properties.putAll(ImmutableMap.of(
+        "cleanup.policy", "compact,delete",
+        "retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
+        "segment.bytes", String.valueOf(segmentBytes)))
+    }
+    properties
+  }
+
   // kafka config
   def getKafkaSystemConsumerConfig( systemName: String,
                                     clientId: String,

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index a2256c8..013b292 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -139,7 +139,12 @@ class KafkaSystemAdmin(
    * Replication factor for the Changelog topic in kafka
    * Kafka properties to be used during the Changelog topic creation
    */
-  topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends ExtendedSystemAdmin with Logging {
+  topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo](),
+
+  /**
+   * Kafka properties to be used during the intermediate topic creation
+   */
+  intermediateStreamProperties: Map[String, Properties] = Map()) extends ExtendedSystemAdmin with Logging {
 
   import KafkaSystemAdmin._
 
@@ -450,6 +455,8 @@ class KafkaSystemAdmin(
       new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, topicMeta.kafkaProps)
     } else if (spec.isCoordinatorStream){
       new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+    } else if (intermediateStreamProperties.contains(spec.getId)) {
+      KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
     } else {
       KafkaStreamSpec.fromSpec(spec)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index e0b5540..a480042 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,8 +22,9 @@ package org.apache.samza.system.kafka
 import java.util.Properties
 import kafka.utils.ZkUtils
 import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
 import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
-import org.apache.samza.config.Config
+import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, Config}
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.config.KafkaConfig.Config2Kafka
 import org.apache.samza.config.TaskConfig.Config2Task
@@ -125,6 +126,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
        (topicName, changelogInfo)
     }}.toMap
 
+
+    val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
     new KafkaSystemAdmin(
       systemName,
       bootstrapServers,
@@ -134,7 +137,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       timeout,
       bufferSize,
       clientId,
-      topicMetaInformation)
+      topicMetaInformation,
+      intermediateStreamProperties)
   }
 
   def getCoordinatorTopicProperties(config: Config) = {
@@ -144,4 +148,18 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
   }
 
+  def getIntermediateStreamProperties(config : Config): Map[String, Properties] = {
+    val appConfig = new ApplicationConfig(config)
+    if (appConfig.getAppMode == ApplicationMode.BATCH) {
+      val streamConfig = new StreamConfig(config)
+      streamConfig.getStreamIds().filter(streamConfig.getIsIntermediate(_)).map(streamId => {
+        val properties = new Properties()
+        properties.putAll(streamConfig.getStreamProperties(streamId))
+        properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+        (streamId, properties)
+      }).toMap
+    } else {
+      Map()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
new file mode 100644
index 0000000..1846ea8
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestKafkaCheckpointManagerFactory {
+
+  @Test
+  public void testGetCheckpointTopicProperties() {
+    Map<String, String> config = new HashMap<>();
+    Properties properties = new KafkaConfig(new MapConfig(config)).getCheckpointTopicProperties();
+
+    assertEquals(properties.getProperty("cleanup.policy"), "compact");
+    assertEquals(properties.getProperty("segment.bytes"), String.valueOf(KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES()));
+
+    config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
+    properties = new KafkaConfig(new MapConfig(config)).getCheckpointTopicProperties();
+
+    assertEquals(properties.getProperty("cleanup.policy"), "compact,delete");
+    assertEquals(properties.getProperty("segment.bytes"), String.valueOf(KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES()));
+    assertEquals(properties.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
new file mode 100644
index 0000000..a886bab
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+import scala.collection.JavaConversions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestKafkaSystemFactoryJava {
+
+  @Test
+  public void testGetIntermediateStreamProperties() {
+    Map<String, String> config = new HashMap<>();
+    KafkaSystemFactory factory = new KafkaSystemFactory();
+    Map<String, Properties> properties = JavaConversions.mapAsJavaMap(
+        factory.getIntermediateStreamProperties(new MapConfig(config)));
+    assertTrue(properties.isEmpty());
+
+    // no properties for stream
+    config.put("streams.test.samza.intermediate", "true");
+    config.put("streams.test.compression.type", "lz4"); //some random config
+    properties = JavaConversions.mapAsJavaMap(
+        factory.getIntermediateStreamProperties(new MapConfig(config)));
+    assertTrue(properties.isEmpty());
+
+    config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
+    properties = JavaConversions.mapAsJavaMap(
+        factory.getIntermediateStreamProperties(new MapConfig(config)));
+    assertTrue(!properties.isEmpty());
+    Properties prop = properties.get("test");
+    assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
+    assertEquals(prop.getProperty("compression.type"), "lz4");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 3337a36..eba2033 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -56,7 +56,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
   val checkpointTopic = "checkpoint-topic"
   val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
-  val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
+  val checkpointTopicConfig = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties()
 
   val zkSecure = JaasUtils.isZkSecurityEnabled()
 
@@ -307,7 +307,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava)))
+    checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties())
 
   // CheckpointManager with a specific checkpoint topic
   private def getKafkaCheckpointManager = getKafkaCheckpointManagerWithParam(checkpointTopic)
@@ -329,7 +329,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
     serde = new InvalideSerde(exception),
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava)))
+    checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties())
 
   class InvalideSerde(exception: String) extends CheckpointSerde {
     override def fromBytes(bytes: Array[Byte]): Checkpoint = {

http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 0474cbe..19b2cc6 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -138,6 +138,19 @@ class TestKafkaConfig {
     assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse(""))
     assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse(""))
     assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse(""))
+
+    props.setProperty(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name())
+    val batchMapConfig = new MapConfig(props.asScala.asJava)
+    val batchKafkaConfig = new KafkaConfig(batchMapConfig)
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"),
+      String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact,delete")
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"),
+      String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact,delete")
+    assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("retention.ms"),
+      String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
   }
 
   @Test