You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/05/07 01:43:41 UTC
[samza] branch master updated: Setting transactional state restore
to default enabled (#1346)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 0080433 Setting transactional state restore to default enabled (#1346)
0080433 is described below
commit 0080433d0a97a4de9173732dec12812a23b350c9
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Wed May 6 18:43:31 2020 -0700
Setting transactional state restore to default enabled (#1346)
---
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java | 2 +-
.../src/test/java/org/apache/samza/system/MockSystemFactory.java | 4 ++--
.../src/test/scala/org/apache/samza/config/TestKafkaConfig.scala | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index f5f09b2..461b647 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -111,7 +111,7 @@ public class TaskConfig extends MapConfig {
public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled";
- private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false;
+ private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = true;
public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
"task.transactional.state.retain.existing.state";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
index e3030ab..9c8dc58 100644
--- a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
@@ -154,14 +154,14 @@ public class MockSystemFactory implements SystemFactory {
@Override
public Integer offsetComparator(String offset1, String offset2) {
- return null;
+ return offset1.compareTo(offset2);
}
@Override
public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
return getSystemStreamMetadata(streamNames);
}
-
+
@Override
public boolean createStream(StreamSpec streamSpec) {
return true;
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 64b476b..f62e8a3 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -169,7 +169,7 @@ class TestKafkaConfig {
assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
- assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
+ assertNotNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()