You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/05/07 01:43:41 UTC

[samza] branch master updated: Setting transactional state restore to default enabled (#1346)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 0080433  Setting transactional state restore to default enabled (#1346)
0080433 is described below

commit 0080433d0a97a4de9173732dec12812a23b350c9
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Wed May 6 18:43:31 2020 -0700

    Setting transactional state restore to default enabled (#1346)
---
 samza-core/src/main/java/org/apache/samza/config/TaskConfig.java      | 2 +-
 .../src/test/java/org/apache/samza/system/MockSystemFactory.java      | 4 ++--
 .../src/test/scala/org/apache/samza/config/TestKafkaConfig.scala      | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index f5f09b2..461b647 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -111,7 +111,7 @@ public class TaskConfig extends MapConfig {
   public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
   public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled";
-  private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false;
+  private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = true;
   public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
       "task.transactional.state.retain.existing.state";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
diff --git a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
index e3030ab..9c8dc58 100644
--- a/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/system/MockSystemFactory.java
@@ -154,14 +154,14 @@ public class MockSystemFactory implements SystemFactory {
 
       @Override
       public Integer offsetComparator(String offset1, String offset2) {
-        return null;
+        return offset1.compareTo(offset2);
       }
 
       @Override
       public Map<String, SystemStreamMetadata> getSystemStreamPartitionCounts(Set<String> streamNames, long cacheTTL) {
         return getSystemStreamMetadata(streamNames);
       }
-      
+
       @Override
       public boolean createStream(StreamSpec streamSpec) {
         return true;
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 64b476b..f62e8a3 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -169,7 +169,7 @@ class TestKafkaConfig {
     assertEquals("otherstream", storeToChangelog.getOrDefault("test3", ""))
     assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("retention.ms"))
     assertNull(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("retention.ms"))
-    assertNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
+    assertNotNull(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("min.compaction.lag.ms"))
 
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.SomeOtherFactory")
     val storeToChangelog1 = kafkaConfig.getKafkaChangelogEnabledStores()