You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/25 01:20:42 UTC
samza git commit: SAMZA-1457: Set retention for internal streams for
Batch application
Repository: samza
Updated Branches:
refs/heads/master cc1ca2c9d -> e6049b7dd
SAMZA-1457: Set retention for internal streams for Batch application
For intermediate streams, checkpoint and changelog, we need to set a short retention period for batch.
Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Author: xiliu <xi...@xiliu-ld1.linkedin.biz>
Reviewers: Jagadish V <vj...@gmail.com>
Closes #328 from xinyuiscool/SAMZA-1457
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6049b7d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6049b7d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6049b7d
Branch: refs/heads/master
Commit: e6049b7ddca385b5a1a363020b9d711f4a0b93b6
Parents: cc1ca2c
Author: Xinyu Liu <xi...@gmail.com>
Authored: Tue Oct 24 18:20:32 2017 -0700
Committer: xiliu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Oct 24 18:20:32 2017 -0700
----------------------------------------------------------------------
.../samza/system/kafka/KafkaStreamSpec.java | 9 +++
.../kafka/KafkaCheckpointManagerFactory.scala | 21 ++-----
.../org/apache/samza/config/KafkaConfig.scala | 37 +++++++++++-
.../samza/system/kafka/KafkaSystemAdmin.scala | 9 ++-
.../samza/system/kafka/KafkaSystemFactory.scala | 22 ++++++-
.../TestKafkaCheckpointManagerFactory.java | 51 +++++++++++++++++
.../kafka/TestKafkaSystemFactoryJava.java | 60 ++++++++++++++++++++
.../kafka/TestKafkaCheckpointManager.scala | 6 +-
.../apache/samza/config/TestKafkaConfig.scala | 13 +++++
9 files changed, 204 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
index a49c022..de7b7b0 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
@@ -167,6 +167,15 @@ public class KafkaStreamSpec extends StreamSpec {
return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), getProperties());
}
+ /**
+ * Make a copy of the spec with new properties
+ * @param properties properties of the Kafka stream
+ * @return new instance of {@link KafkaStreamSpec}
+ */
+ public KafkaStreamSpec copyWithProperties(Properties properties) {
+ return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), getReplicationFactor(), properties);
+ }
+
public int getReplicationFactor() {
return replicationFactor;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
index 402248f..8ac347c 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
@@ -21,11 +21,13 @@ package org.apache.samza.checkpoint.kafka
import java.util.Properties
+import com.google.common.collect.ImmutableMap
import kafka.utils.ZkUtils
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.{Config, KafkaConfig, SystemConfig}
+import org.apache.samza.config._
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemFactory
import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _}
@@ -38,21 +40,6 @@ object KafkaCheckpointManagerFactory {
// on log compacted topics. Details in SAMZA-586.
"compression.type" -> "none")
- // Set the checkpoint topic configs to have a very small segment size and
- // enable log compaction. This keeps job startup time small since there
- // are fewer useless (overwritten) messages to read from the checkpoint
- // topic.
- def getCheckpointTopicProperties(config: Config) = {
- val segmentBytes: Int = if (config == null) {
- KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
- } else {
- new KafkaConfig(config).getCheckpointSegmentBytes()
- }
- (new Properties /: Map(
- "cleanup.policy" -> "compact",
- "segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
- }
-
/**
* Get the checkpoint system and system factory from the configuration
* @param config
@@ -113,6 +100,6 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
connectZk,
config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key
config.failOnCheckpointValidation,
- checkpointTopicProperties = getCheckpointTopicProperties(config))
+ checkpointTopicProperties = new KafkaConfig(config).getCheckpointTopicProperties())
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 9c33b16..1c1cdbd 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -21,13 +21,16 @@ package org.apache.samza.config
import java.util
+import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import java.util.{Properties, UUID}
+import com.google.common.collect.ImmutableMap
import kafka.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.util.{Logging, Util}
@@ -76,6 +79,8 @@ object KafkaConfig {
*/
val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+ val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1)
+
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
@@ -243,13 +248,43 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getChangelogKafkaProperties(name: String) = {
val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
val kafkaChangeLogProperties = new Properties
- kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+
+ val appConfig = new ApplicationConfig(config)
+ if (appConfig.getAppMode == ApplicationMode.STREAM) {
+ kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+ } else{
+ kafkaChangeLogProperties.setProperty("cleanup.policy", "compact,delete")
+ kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+ }
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
kafkaChangeLogProperties
}
+ // Set the checkpoint topic configs to have a very small segment size and
+ // enable log compaction. This keeps job startup time small since there
+ // are fewer useless (overwritten) messages to read from the checkpoint
+ // topic.
+ def getCheckpointTopicProperties() = {
+ val segmentBytes: Int = getCheckpointSegmentBytes()
+ val appConfig = new ApplicationConfig(config)
+ val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
+ val properties = new Properties()
+
+ if (isStreamMode) {
+ properties.putAll(ImmutableMap.of(
+ "cleanup.policy", "compact",
+ "segment.bytes", String.valueOf(segmentBytes)))
+ } else {
+ properties.putAll(ImmutableMap.of(
+ "cleanup.policy", "compact,delete",
+ "retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
+ "segment.bytes", String.valueOf(segmentBytes)))
+ }
+ properties
+ }
+
// kafka config
def getKafkaSystemConsumerConfig( systemName: String,
clientId: String,
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index a2256c8..013b292 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -139,7 +139,12 @@ class KafkaSystemAdmin(
* Replication factor for the Changelog topic in kafka
* Kafka properties to be used during the Changelog topic creation
*/
- topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]()) extends ExtendedSystemAdmin with Logging {
+ topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo](),
+
+ /**
+ * Kafka properties to be used during the intermediate topic creation
+ */
+ intermediateStreamProperties: Map[String, Properties] = Map()) extends ExtendedSystemAdmin with Logging {
import KafkaSystemAdmin._
@@ -450,6 +455,8 @@ class KafkaSystemAdmin(
new KafkaStreamSpec(spec.getId, topicName, systemName, spec.getPartitionCount, topicMeta.replicationFactor, topicMeta.kafkaProps)
} else if (spec.isCoordinatorStream){
new KafkaStreamSpec(spec.getId, spec.getPhysicalName, systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties)
+ } else if (intermediateStreamProperties.contains(spec.getId)) {
+ KafkaStreamSpec.fromSpec(spec).copyWithProperties(intermediateStreamProperties(spec.getId))
} else {
KafkaStreamSpec.fromSpec(spec)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index e0b5540..a480042 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -22,8 +22,9 @@ package org.apache.samza.system.kafka
import java.util.Properties
import kafka.utils.ZkUtils
import org.apache.samza.SamzaException
+import org.apache.samza.config.ApplicationConfig.ApplicationMode
import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
-import org.apache.samza.config.Config
+import org.apache.samza.config.{KafkaConfig, ApplicationConfig, StreamConfig, Config}
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.KafkaConfig.Config2Kafka
import org.apache.samza.config.TaskConfig.Config2Task
@@ -125,6 +126,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
(topicName, changelogInfo)
}}.toMap
+
+ val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config)
new KafkaSystemAdmin(
systemName,
bootstrapServers,
@@ -134,7 +137,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
timeout,
bufferSize,
clientId,
- topicMetaInformation)
+ topicMetaInformation,
+ intermediateStreamProperties)
}
def getCoordinatorTopicProperties(config: Config) = {
@@ -144,4 +148,18 @@ class KafkaSystemFactory extends SystemFactory with Logging {
"segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
}
+ def getIntermediateStreamProperties(config : Config): Map[String, Properties] = {
+ val appConfig = new ApplicationConfig(config)
+ if (appConfig.getAppMode == ApplicationMode.BATCH) {
+ val streamConfig = new StreamConfig(config)
+ streamConfig.getStreamIds().filter(streamConfig.getIsIntermediate(_)).map(streamId => {
+ val properties = new Properties()
+ properties.putAll(streamConfig.getStreamProperties(streamId))
+ properties.putIfAbsent("retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+ (streamId, properties)
+ }).toMap
+ } else {
+ Map()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
new file mode 100644
index 0000000..1846ea8
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaCheckpointManagerFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestKafkaCheckpointManagerFactory {
+
+ @Test
+ public void testGetCheckpointTopicProperties() {
+ Map<String, String> config = new HashMap<>();
+ Properties properties = new KafkaConfig(new MapConfig(config)).getCheckpointTopicProperties();
+
+ assertEquals(properties.getProperty("cleanup.policy"), "compact");
+ assertEquals(properties.getProperty("segment.bytes"), String.valueOf(KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES()));
+
+ config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
+ properties = new KafkaConfig(new MapConfig(config)).getCheckpointTopicProperties();
+
+ assertEquals(properties.getProperty("cleanup.policy"), "compact,delete");
+ assertEquals(properties.getProperty("segment.bytes"), String.valueOf(KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES()));
+ assertEquals(properties.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
new file mode 100644
index 0000000..a886bab
--- /dev/null
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemFactoryJava.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka;
+
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+import scala.collection.JavaConversions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestKafkaSystemFactoryJava {
+
+ @Test
+ public void testGetIntermediateStreamProperties() {
+ Map<String, String> config = new HashMap<>();
+ KafkaSystemFactory factory = new KafkaSystemFactory();
+ Map<String, Properties> properties = JavaConversions.mapAsJavaMap(
+ factory.getIntermediateStreamProperties(new MapConfig(config)));
+ assertTrue(properties.isEmpty());
+
+ // no properties for stream
+ config.put("streams.test.samza.intermediate", "true");
+ config.put("streams.test.compression.type", "lz4"); //some random config
+ properties = JavaConversions.mapAsJavaMap(
+ factory.getIntermediateStreamProperties(new MapConfig(config)));
+ assertTrue(properties.isEmpty());
+
+ config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
+ properties = JavaConversions.mapAsJavaMap(
+ factory.getIntermediateStreamProperties(new MapConfig(config)));
+ assertTrue(!properties.isEmpty());
+ Properties prop = properties.get("test");
+ assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
+ assertEquals(prop.getProperty("compression.type"), "lz4");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 3337a36..eba2033 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -56,7 +56,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
val checkpointTopic = "checkpoint-topic"
val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
- val checkpointTopicConfig = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(null)
+ val checkpointTopicConfig = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties()
val zkSecure = JaasUtils.isZkSecurityEnabled()
@@ -307,7 +307,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
failOnCheckpointValidation = failOnTopicValidation,
- checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava)))
+ checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties())
// CheckpointManager with a specific checkpoint topic
private def getKafkaCheckpointManager = getKafkaCheckpointManagerWithParam(checkpointTopic)
@@ -329,7 +329,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
failOnCheckpointValidation = failOnTopicValidation,
serde = new InvalideSerde(exception),
- checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava)))
+ checkpointTopicProperties = new org.apache.samza.config.KafkaConfig(new MapConfig()).getCheckpointTopicProperties())
class InvalideSerde(exception: String) extends CheckpointSerde {
override def fromBytes(bytes: Array[Byte]): Checkpoint = {
http://git-wip-us.apache.org/repos/asf/samza/blob/e6049b7d/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 0474cbe..19b2cc6 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -138,6 +138,19 @@ class TestKafkaConfig {
assertEquals("mychangelog1", storeToChangelog1.get("test1").getOrElse(""))
assertEquals("mychangelog2", storeToChangelog1.get("test2").getOrElse(""))
assertEquals("otherstream", storeToChangelog1.get("test3").getOrElse(""))
+
+ props.setProperty(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name())
+ val batchMapConfig = new MapConfig(props.asScala.asJava)
+ val batchKafkaConfig = new KafkaConfig(batchMapConfig)
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"),
+ String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact,delete")
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"),
+ String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact,delete")
+ assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("retention.ms"),
+ String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
}
@Test