You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2020/02/06 23:31:25 UTC

[samza] 03/06: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251)

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.1
in repository https://gitbox.apache.org/repos/asf/samza.git

commit e250678fa4bb238ec82414d20ba2212da29a1f27
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Tue Jan 14 19:02:13 2020 -0800

    SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251)
    
    * Fix the checkpoint and changelog topic creation configurations.
    
    * Address review comments.
    
    * Address review comments.
---
 .../java/org/apache/samza/system/StreamSpec.java   |  4 +-
 .../samza/system/kafka/KafkaSystemAdmin.java       |  7 +++-
 .../org/apache/samza/config/KafkaConfig.scala      |  6 ++-
 .../system/kafka/TestKafkaSystemAdminJava.java     | 26 ++++++++++++
 .../org/apache/samza/config/TestKafkaConfig.scala  | 46 +++++++++++++++++++++-
 5 files changed, 82 insertions(+), 7 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index a1ad5e4..c122371 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.system;
 
+import com.google.common.base.Joiner;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
@@ -269,6 +271,6 @@ public class StreamSpec implements Serializable {
 
   @Override
   public String toString() {
-    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount);
+    return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d, config=%s.", id, systemName, physicalName, partitionCount, Joiner.on(",").withKeyValueSeparator("=").join(config));
   }
 }
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index e5d6af1..ecb95a9 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -541,8 +541,11 @@ public class KafkaSystemAdmin implements SystemAdmin {
           new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor,
               coordinatorStreamProperties);
     } else if (spec.isCheckpointStream()) {
-      kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName))
-              .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()));
+      Properties checkpointTopicProperties = new Properties();
+      checkpointTopicProperties.putAll(spec.getConfig());
+      kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName()))
+              .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get()))
+              .copyWithProperties(checkpointTopicProperties);
     } else if (intermediateStreamProperties.containsKey(spec.getId())) {
       kafkaSpec = KafkaStreamSpec.fromSpec(spec);
       Properties properties = kafkaSpec.getProperties();
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 75fbb6b..3b5f5f3 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -316,7 +316,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
     val kafkaChangeLogProperties = new Properties
 
-    val appConfig = new ApplicationConfig(config)
     // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57,
     // 1.0.2, or 1.1.0 (see KAFKA-6568)
 
@@ -325,7 +324,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     //  - Set topic TTL to be the same as RocksDB TTL
     Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
       case Some(rocksDbTtl) =>
-        if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
+        if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) {
+          kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+          kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
+        } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
           kafkaChangeLogProperties.setProperty("cleanup.policy", "delete")
           if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) {
             kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl))
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
index 7ca03f3..82d635f 100644
--- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
@@ -115,6 +115,32 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
   }
 
   @Test
+  public void testToKafkaSpecForCheckpointStreamShouldReturnTheCorrectStreamSpecByPreservingTheConfig() {
+    String topicName = "testStream";
+    String streamId = "samza-internal-checkpoint-stream-id";
+    int partitionCount = 1;
+    Map<String, String> map = new HashMap<>();
+    map.put("cleanup.policy", "compact");
+    map.put("replication.factor", "3");
+    map.put("segment.bytes", "536870912");
+    map.put("delete.retention.ms", "86400000");
+
+    Config config = new MapConfig(map);
+
+    StreamSpec spec = new StreamSpec(streamId, topicName, SYSTEM, partitionCount, config);
+    KafkaSystemAdmin kafkaSystemAdmin = systemAdmin();
+    KafkaStreamSpec kafkaStreamSpec = kafkaSystemAdmin.toKafkaSpec(spec);
+    System.out.println(kafkaStreamSpec);
+    assertEquals(streamId, kafkaStreamSpec.getId());
+    assertEquals(topicName, kafkaStreamSpec.getPhysicalName());
+    assertEquals(partitionCount, kafkaStreamSpec.getPartitionCount());
+    assertEquals(3, kafkaStreamSpec.getReplicationFactor());
+    assertEquals("compact", kafkaStreamSpec.getConfig().get("cleanup.policy"));
+    assertEquals("536870912", kafkaStreamSpec.getConfig().get("segment.bytes"));
+    assertEquals("86400000", kafkaStreamSpec.getConfig().get("delete.retention.ms"));
+  }
+
+  @Test
   public void testToKafkaSpec() {
     String topicName = "testStream";
 
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 8558a85..00b103d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -22,14 +22,14 @@ package org.apache.samza.config
 import java.util.Properties
 import java.util.concurrent.TimeUnit
 
-import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
+import org.junit.After
+import org.junit.Before
 import org.junit.Test
 
 import scala.collection.JavaConverters._
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.clients.producer.ProducerConfig
-import org.junit.Before
 
 class TestKafkaConfig {
 
@@ -47,6 +47,10 @@ class TestKafkaConfig {
     props.setProperty(JobConfig.JOB_NAME, "jobName")
   }
 
+  @After
+  def clearUpProperties(): Unit = {
+    props.clear()
+  }
 
   @Test
   def testStreamLevelFetchSizeOverride() {
@@ -82,6 +86,44 @@ class TestKafkaConfig {
   }
 
   @Test
+  def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForInfiniteTTLStores(): Unit = {
+    val props = new Properties
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+    props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+    props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+    props.setProperty("stores.test1.rocksdb.ttl.ms", "-1")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+    assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
+    assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+    assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
+    assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+  }
+
+  @Test
+  def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = {
+    val props = new Properties
+    props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092")
+    props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty(JobConfig.JOB_NAME, "jobName")
+
+    props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1")
+    assertEquals("compact", kafkaProperties.getProperty("cleanup.policy"))
+    assertEquals("536870912", kafkaProperties.getProperty("segment.bytes"))
+    assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms"))
+    assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes"))
+
+  }
+
+  @Test
   def testChangeLogProperties() {
     props.setProperty("job.changelog.system", SYSTEM_NAME)
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")