You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/23 01:40:32 UTC

[samza] 01/03: SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 0de2385540747af500efcf134cf80f0748ffff40
Author: xinyuiscool <xi...@linkedin.com>
AuthorDate: Thu Oct 17 14:12:45 2019 -0700

    SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190)
---
 .../main/java/org/apache/samza/config/StorageConfig.java | 16 ++++++++++++++++
 .../java/org/apache/samza/config/TestStorageConfig.java  | 14 ++++++++++++++
 .../main/scala/org/apache/samza/config/KafkaConfig.scala | 12 +++++++++++-
 .../scala/org/apache/samza/config/TestKafkaConfig.scala  | 15 ++++++++++++++-
 4 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 86c7e7d..2b7ce02 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -31,6 +31,8 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.util.StreamUtil;
 
+import static com.google.common.base.Preconditions.*;
+
 
 /**
  * Config helper methods related to storage.
@@ -45,6 +47,7 @@ public class StorageConfig extends MapConfig {
   public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde";
   public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX;
   public static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
+  // TODO: setting replication.factor seems not working as in KafkaConfig.
   public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor";
   public static final String CHANGELOG_MAX_MSG_SIZE_BYTES = STORE_PREFIX + "%s.changelog.max.message.size.bytes";
   public static final int DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES = 1048576;
@@ -52,6 +55,10 @@ public class StorageConfig extends MapConfig {
   public static final boolean DEFAULT_DISALLOW_LARGE_MESSAGES = false;
   public static final String DROP_LARGE_MESSAGES = STORE_PREFIX + "%s.drop.large.messages";
   public static final boolean DEFAULT_DROP_LARGE_MESSAGES = false;
+  // The log compaction lag time for transactional state change log
+  public static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms";
+  public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS;
+  public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4);
 
   static final String CHANGELOG_SYSTEM = "job.changelog.system";
   static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
@@ -207,6 +214,15 @@ public class StorageConfig extends MapConfig {
     return getBoolean(String.format(DROP_LARGE_MESSAGES, storeName), DEFAULT_DROP_LARGE_MESSAGES);
   }
 
+  public long getChangelogMinCompactionLagMs(String storeName) {
+    String minCompactLagConfigName = String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, storeName);
+    // Avoid the inconsistency of overriding using stores.x.changelog.kafka...
+    checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null,
+        "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property.");
+
+    return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+  }
+
   /**
    * Helper method to check if a system has a changelog attached to it.
    */
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index e094de2..713aa49 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.TimeUnit;
 import org.apache.samza.SamzaException;
 import org.junit.Test;
 
@@ -296,4 +297,17 @@ public class TestStorageConfig {
         new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true")));
     assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0));
   }
+
+  @Test
+  public void testGetChangelogMinCompactionLagMs() {
+    // empty config, return default lag ms
+    assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
+        new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
+
+    long lagOverride = TimeUnit.HOURS.toMillis(6);
+    StorageConfig storageConfig = new StorageConfig(
+        new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
+            String.valueOf(lagOverride))));
+    assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
+  }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index f8051f2..75fbb6b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -336,8 +336,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
         kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
     }
 
+    val storageConfig = new StorageConfig(config)
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
-    kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
+    kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(storageConfig.getChangeLogDeleteRetentionInMs(name)))
+
+    // To enable transactional state, we will need to avoid the head of the changelog
+    // (the messages after last checkpoint) being log-compacted so we can trim the rest of the updates.
+    // We use min.compaction.log.ms to control the compaction time.
+    if (new TaskConfig(this).getTransactionalStateRestoreEnabled) {
+      kafkaChangeLogProperties.setProperty(StorageConfig.MIN_COMPACTION_LAG_MS,
+        String.valueOf(storageConfig.getChangelogMinCompactionLagMs(name)))
+    }
+
     filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index ea6c3f8..8558a85 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -20,6 +20,7 @@
 package org.apache.samza.config
 
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 
 import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
@@ -82,11 +83,11 @@ class TestKafkaConfig {
 
   @Test
   def testChangeLogProperties() {
+    props.setProperty("job.changelog.system", SYSTEM_NAME)
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
     props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
     props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
     props.setProperty("stores.test2.changelog.max.message.bytes", "1024000")
-    props.setProperty("job.changelog.system", "kafka")
     props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
     props.setProperty("stores.test4.rocksdb.ttl.ms", "3600")
@@ -107,6 +108,7 @@ class TestKafkaConfig {
     assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
     assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
     assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
+    assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
 
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
     val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
@@ -138,6 +140,17 @@ class TestKafkaConfig {
       String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
     assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"),
       KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
+
+    // test compaction config for transactional state
+    val lagOverride = String.valueOf(TimeUnit.HOURS.toMillis(6))
+    props.setProperty(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true")
+    props.setProperty("stores.test2.changelog.min.compaction.lag.ms", lagOverride)
+    val tsMapConfig = new MapConfig(props.asScala.asJava)
+    val tsKafkaConfig = new KafkaConfig(tsMapConfig)
+    assertEquals(String.valueOf(StorageConfig.DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS),
+      tsKafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
+    assertEquals(lagOverride,
+      tsKafkaConfig.getChangelogKafkaProperties("test2").getProperty("min.compaction.lag.ms"))
   }
 
   @Test