You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/09 20:05:47 UTC
samza git commit: Fix side input stores to use logged store directory
Repository: samza
Updated Branches:
refs/heads/master e312bb552 -> 07199cb07
Fix side input stores to use logged store directory
Side input stores are non-changelog stores that still needs to use logged stored directory to guarantee durability.
Author: bharathkk <co...@gmail.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #701 from bharathkk/side-input-fix
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/07199cb0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/07199cb0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/07199cb0
Branch: refs/heads/master
Commit: 07199cb074db3e2773940e58887efb40d0771278
Parents: e312bb5
Author: bharathkk <co...@gmail.com>
Authored: Tue Oct 9 13:05:43 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Tue Oct 9 13:05:43 2018 -0700
----------------------------------------------------------------------
.../scala/org/apache/samza/container/SamzaContainer.scala | 4 +++-
.../apache/samza/test/table/TestLocalTableWithSideInputs.java | 7 +++++++
2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5c4723b..3c10aae 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -553,7 +553,9 @@ object SamzaContainer extends Logging {
case _ => null
}
- val storeDir = if (changeLogSystemStreamPartition != null) {
+ // We use the logged storage base directory for change logged and side input stores since side input stores
+ // dont have changelog configured.
+ val storeDir = if (changeLogSystemStreamPartition != null || sideInputStoresToSystemStreams.contains(storeName)) {
TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
} else {
TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName)
http://git-wip-us.apache.org/repos/asf/samza/blob/07199cb0/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index c1657ff..c31052d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -21,6 +21,7 @@ package org.apache.samza.test.table;
import com.google.common.collect.ImmutableList;
+import java.nio.file.FileSystems;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
@@ -30,6 +31,7 @@ import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.operators.KV;
@@ -82,6 +84,11 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
Map<String, String> configs = new HashMap<>();
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
+ configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
+ FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString());
+ // SideInput Tables needs this to be configured for persisting data
+ configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
+ FileSystems.getDefault().getPath("logged").toAbsolutePath().toString());
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);