You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/23 01:40:32 UTC
[samza] 01/03: SAMZA-2352: Use min.compaction.lag.ms to avoid
compacting the Kafka changelog topic (#1190)
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git
commit 0de2385540747af500efcf134cf80f0748ffff40
Author: xinyuiscool <xi...@linkedin.com>
AuthorDate: Thu Oct 17 14:12:45 2019 -0700
SAMZA-2352: Use min.compaction.lag.ms to avoid compacting the Kafka changelog topic (#1190)
---
.../main/java/org/apache/samza/config/StorageConfig.java | 16 ++++++++++++++++
.../java/org/apache/samza/config/TestStorageConfig.java | 14 ++++++++++++++
.../main/scala/org/apache/samza/config/KafkaConfig.scala | 12 +++++++++++-
.../scala/org/apache/samza/config/TestKafkaConfig.scala | 15 ++++++++++++++-
4 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index 86c7e7d..2b7ce02 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -31,6 +31,8 @@ import org.apache.samza.SamzaException;
import org.apache.samza.execution.StreamManager;
import org.apache.samza.util.StreamUtil;
+import static com.google.common.base.Preconditions.*;
+
/**
* Config helper methods related to storage.
@@ -45,6 +47,7 @@ public class StorageConfig extends MapConfig {
public static final String MSG_SERDE = STORE_PREFIX + "%s.msg.serde";
public static final String CHANGELOG_STREAM = STORE_PREFIX + "%s" + CHANGELOG_SUFFIX;
public static final String ACCESSLOG_STREAM_SUFFIX = "access-log";
+ // TODO: setting replication.factor seems not working as in KafkaConfig.
public static final String CHANGELOG_REPLICATION_FACTOR = STORE_PREFIX + "%s.changelog.replication.factor";
public static final String CHANGELOG_MAX_MSG_SIZE_BYTES = STORE_PREFIX + "%s.changelog.max.message.size.bytes";
public static final int DEFAULT_CHANGELOG_MAX_MSG_SIZE_BYTES = 1048576;
@@ -52,6 +55,10 @@ public class StorageConfig extends MapConfig {
public static final boolean DEFAULT_DISALLOW_LARGE_MESSAGES = false;
public static final String DROP_LARGE_MESSAGES = STORE_PREFIX + "%s.drop.large.messages";
public static final boolean DEFAULT_DROP_LARGE_MESSAGES = false;
+ // The log compaction lag time for transactional state change log
+ public static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms";
+ public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS;
+ public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4);
static final String CHANGELOG_SYSTEM = "job.changelog.system";
static final String CHANGELOG_DELETE_RETENTION_MS = STORE_PREFIX + "%s.changelog.delete.retention.ms";
@@ -207,6 +214,15 @@ public class StorageConfig extends MapConfig {
return getBoolean(String.format(DROP_LARGE_MESSAGES, storeName), DEFAULT_DROP_LARGE_MESSAGES);
}
+ public long getChangelogMinCompactionLagMs(String storeName) {
+ String minCompactLagConfigName = String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, storeName);
+ // Avoid the inconsistency of overriding using stores.x.changelog.kafka...
+ checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null,
+ "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property.");
+
+ return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+ }
+
/**
* Helper method to check if a system has a changelog attached to it.
*/
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index e094de2..713aa49 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.junit.Test;
@@ -296,4 +297,17 @@ public class TestStorageConfig {
new MapConfig(ImmutableMap.of(String.format(StorageConfig.DROP_LARGE_MESSAGES, STORE_NAME0), "true")));
assertEquals(true, storageConfig.getDropLargeMessages(STORE_NAME0));
}
+
+ @Test
+ public void testGetChangelogMinCompactionLagMs() {
+ // empty config, return default lag ms
+ assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
+ new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
+
+ long lagOverride = TimeUnit.HOURS.toMillis(6);
+ StorageConfig storageConfig = new StorageConfig(
+ new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
+ String.valueOf(lagOverride))));
+ assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
+ }
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index f8051f2..75fbb6b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -336,8 +336,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
}
+ val storageConfig = new StorageConfig(config)
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
- kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
+ kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(storageConfig.getChangeLogDeleteRetentionInMs(name)))
+
+ // To enable transactional state, we will need to avoid the head of the changelog
+ // (the messages after last checkpoint) being log-compacted so we can trim the rest of the updates.
+ // We use min.compaction.log.ms to control the compaction time.
+ if (new TaskConfig(this).getTransactionalStateRestoreEnabled) {
+ kafkaChangeLogProperties.setProperty(StorageConfig.MIN_COMPACTION_LAG_MS,
+ String.valueOf(storageConfig.getChangelogMinCompactionLagMs(name)))
+ }
+
filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
kafkaChangeLogProperties
}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index ea6c3f8..8558a85 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -20,6 +20,7 @@
package org.apache.samza.config
import java.util.Properties
+import java.util.concurrent.TimeUnit
import org.apache.samza.config.factories.PropertiesConfigFactory
import org.junit.Assert._
@@ -82,11 +83,11 @@ class TestKafkaConfig {
@Test
def testChangeLogProperties() {
+ props.setProperty("job.changelog.system", SYSTEM_NAME)
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
props.setProperty("stores.test2.changelog.max.message.bytes", "1024000")
- props.setProperty("job.changelog.system", "kafka")
props.setProperty("stores.test3.changelog", "otherstream")
props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
props.setProperty("stores.test4.rocksdb.ttl.ms", "3600")
@@ -107,6 +108,7 @@ class TestKafkaConfig {
assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
+ assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()
@@ -138,6 +140,17 @@ class TestKafkaConfig {
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"),
KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
+
+ // test compaction config for transactional state
+ val lagOverride = String.valueOf(TimeUnit.HOURS.toMillis(6))
+ props.setProperty(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true")
+ props.setProperty("stores.test2.changelog.min.compaction.lag.ms", lagOverride)
+ val tsMapConfig = new MapConfig(props.asScala.asJava)
+ val tsKafkaConfig = new KafkaConfig(tsMapConfig)
+ assertEquals(String.valueOf(StorageConfig.DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS),
+ tsKafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
+ assertEquals(lagOverride,
+ tsKafkaConfig.getChangelogKafkaProperties("test2").getProperty("min.compaction.lag.ms"))
}
@Test