You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2020/02/06 23:31:25 UTC
[samza] 03/06: SAMZA-2431: Fix the checkpoint and changelog topic
auto-creation. (#1251)
This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git
commit e250678fa4bb238ec82414d20ba2212da29a1f27
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Jan 14 19:02:13 2020 -0800
SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251)
* Fix the checkpoint and changelog topic creation configurations.
* Address review comments.
* Address review comments.
---
.../java/org/apache/samza/system/StreamSpec.java | 4 +-
.../samza/system/kafka/KafkaSystemAdmin.java | 7 +++-
.../org/apache/samza/config/KafkaConfig.scala | 6 ++-
.../system/kafka/TestKafkaSystemAdminJava.java | 26 ++++++++++++
.../org/apache/samza/config/TestKafkaConfig.scala | 46 +++++++++++++++++++++-
5 files changed, 82 insertions(+), 7 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index a1ad5e4..c122371 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -19,6 +19,8 @@
package org.apache.samza.system;
+import com.google.common.base.Joiner;
+
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
@@ -269,6 +271,6 @@ public class StreamSpec implements Serializable {
@Override
public String toString() {
- return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
+ return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d, config=%s.", id, systemName, physicalName, partitionCount, Joiner.on(",").withKeyValueSeparator("=").join(config));
}
}
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index e5d6af1..ecb95a9 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -541,8 +541,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor,
coordinatorStreamProperties);
} else if (spec.isCheckpointStream()) {
- kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName))
- .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()));
+ Properties checkpointTopicProperties = new Properties();
+ checkpointTopicProperties.putAll(spec.getConfig());
+ kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName()))
+ .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()))
+ .copyWithProperties(checkpointTopicProperties);
} else if (intermediateStreamProperties.containsKey(spec.getId())) {
kafkaSpec = KafkaStreamSpec.fromSpec(spec);
Properties properties = kafkaSpec.getProperties();
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 75fbb6b..3b5f5f3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -316,7 +316,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
val kafkaChangeLogProperties = new Properties
- val appConfig = new ApplicationConfig(config)
// SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
// 1.0.2, or 1.1.0 (see KAFKA-6568)
@@ -325,7 +324,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// - Set topic TTL to be the same as RocksDB TTL
Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
case Some(rocksDbTtl) =>
- if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
+ if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) {
+ kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+ kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
+ } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 7ca03f3..82d635f 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -115,6 +115,32 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
}
@Test
+ public void testToKafkaSpecForCheckpointStreamShouldReturnTheCorrectStreamSpecByPreservingTheConfig() {
+ String topicName = "testStream";
+ String streamId = "samza-internal-checkpoint-stream-id";
+ int partitionCount = 1;
+ Map<String, String> map = new HashMap<>();
+ map.put("cleanup.policy", "compact");
+ map.put("replication.factor", "3");
+ map.put("segment.bytes", "536870912");
+ map.put("delete.retention.ms", "86400000");
+
+ Config config = new MapConfig(map);
+
+ StreamSpec spec = new StreamSpec(streamId, topicName, SYSTEM, partitionCount, config);
+ KafkaSystemAdmin kafkaSystemAdmin = systemAdmin();
+ KafkaStreamSpec kafkaStreamSpec = kafkaSystemAdmin.toKafkaSpec(spec);
+ System.out.println(kafkaStreamSpec);
+ assertEquals(streamId, kafkaStreamSpec.getId());
+ assertEquals(topicName, kafkaStreamSpec.getPhysicalName());
+ assertEquals(partitionCount, kafkaStreamSpec.getPartitionCount());
+ assertEquals(3, kafkaStreamSpec.getReplicationFactor());
+ assertEquals("compact", kafkaStreamSpec.getConfig().get("cleanup.policy"));
+ assertEquals("536870912", kafkaStreamSpec.getConfig().get("segment.bytes"));
+ assertEquals("86400000", kafkaStreamSpec.getConfig().get("delete.retention.ms"));
+ }
+
+ @Test
public void testToKafkaSpec() {
String topicName = "testStream";
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 8558a85..00b103d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -22,14 +22,14 @@ package org.apache.samza.config
import java.util.Properties
import java.util.concurrent.TimeUnit
-import org.apache.samza.config.factories.PropertiesConfigFactory
import org.junit.Assert._
+import org.junit.After
+import org.junit.Before
import org.junit.Test
import scala.collection.JavaConverters._
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.clients.producer.ProducerConfig
-import org.junit.Before
class TestKafkaConfig {
@@ -47,6 +47,10 @@ class TestKafkaConfig {
props.setProperty(JobConfig.JOB_NAME, "jobName")
}
+ @After
+ def clearUpProperties(): Unit = {
+ props.clear()
+ }
@Test
def testStreamLevelFetchSizeOverride() {
@@ -82,6 +86,44 @@ class TestKafkaConfig {
}
@Test
+ def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForInfiniteTTLStores(): Unit = {
+ val props = new Properties
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+ props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+ props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+ props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+ props.setProperty("stores.test1.rocksdb.ttl.ms", "-1")
+
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+ assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
+ assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+ assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
+ assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+ }
+
+ @Test
+ def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
+ val props = new Properties
+ props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+ props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+ props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+ props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+
+ val mapConfig = new MapConfig(props.asScala.asJava)
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+ assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
+ assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+ assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+ assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
+
+ }
+
+ @Test
def testChangeLogProperties() {
props.setProperty("job.changelog.system", SYSTEM_NAME)
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")