You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/09 20:05:47 UTC

samza git commit: Fix side input stores to use logged store directory

Repository: samza
Updated Branches:
  refs/heads/master e312bb552 -> 07199cb07


Fix side input stores to use logged store directory

Side input stores are non-changelog stores that still needs to use logged stored directory to guarantee durability.

Author: bharathkk <co...@gmail.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #701 from bharathkk/side-input-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/07199cb0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/07199cb0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/07199cb0

Branch: refs/heads/master
Commit: 07199cb074db3e2773940e58887efb40d0771278
Parents: e312bb5
Author: bharathkk <co...@gmail.com>
Authored: Tue Oct 9 13:05:43 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Oct 9 13:05:43 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/container/SamzaContainer.scala     | 4 +++-
 .../apache/samza/test/table/TestLocalTableWithSideInputs.java | 7 +++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5c4723b..3c10aae 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -553,7 +553,9 @@ object SamzaContainer extends Logging {
               case _ => null
             }
 
-            val storeDir = if (changeLogSystemStreamPartition != null) {
+            // We use the logged storage base directory for change logged and side input stores since side input stores
+            // dont have changelog configured.
+            val storeDir = if (changeLogSystemStreamPartition != null || sideInputStoresToSystemStreams.contains(storeName)) {
               TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
             } else {
               TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName)

http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index c1657ff..c31052d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -21,6 +21,7 @@ package org.apache.samza.test.table;
 
 import com.google.common.collect.ImmutableList;
 
+import java.nio.file.FileSystems;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.operators.KV;
@@ -82,6 +84,11 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     Map<String, String> configs = new HashMap<>();
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
+    configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
+        FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString());
+    // SideInput Tables needs this to be configured for persisting data
+    configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
+        FileSystems.getDefault().getPath("logged").toAbsolutePath().toString());
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
 
     InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);