You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/05/07 19:34:33 UTC
samza git commit: SAMZA-1563: Make RocksDB store base directory
configurable
Repository: samza
Updated Branches:
refs/heads/master 40154b4f5 -> 164fa5f03
SAMZA-1563: Make RocksDB store base directory configurable
xinyuiscool ^^
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Reviewers: Prateek M <pr...@apache.org>
Closes #491 from bharathkk/samza-1563
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/164fa5f0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/164fa5f0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/164fa5f0
Branch: refs/heads/master
Commit: 164fa5f03f26f3b118675469f236e790c0e48e38
Parents: 40154b4
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Mon May 7 12:34:26 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Mon May 7 12:34:26 2018 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 31 +++++++++
.../org/apache/samza/config/JobConfig.scala | 11 ++++
.../apache/samza/container/SamzaContainer.scala | 69 ++++++++++++++------
.../samza/storage/TaskStorageManager.scala | 18 ++---
.../samza/storage/TestTaskStorageManager.scala | 2 +-
.../samza/monitor/LocalStoreMonitorConfig.java | 4 +-
6 files changed, 104 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 86ac427..8557480 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -472,8 +472,39 @@
<dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
<dd>Azure based coordination utils.</dd>
These coordination utils are currently used for intermediate stream creation.
+ </dl>
</td>
</tr>
+
+ <tr>
+ <td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td>
+ <td class="default">
+ <i>user.dir</i> environment property if set, else current working directory of the process
+ </td>
+ <td class="description">
+ The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable <i>LOGGED_STORE_BASE_DIR</i>.
+ <b>Note:</b> The environment variable takes precedence over <i>job.logged.store.base.dir</i>.
+
+ <br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity should ensure that the stores are persisted across application/container restarts.
+ This means that the location and cleanup of this directory should be separate from the container lifecycle and resource cleanup.
+ </td>
+ </tr>
+
+ <tr>
+ <td class="property" id="job.non-logged.store.base.dir">job.non-logged.store.base.dir</td>
+ <td class="default">
+ <i>user.dir</i> environment property if set, else current working directory of the process
+ </td>
+ <td class="description">
+ The base directory for non-changelog stores used by Samza application.
+
+ <br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory.
+ This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config <i>yarn.nodemanager.delete.debug-delay-sec</i>.
+
+ <br>In non-YARN deployment models or when using a different directory other than YARN container directory, stores need to be cleaned up periodically.
+ </td>
+ </tr>
+
<tr>
<!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
<th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index de83919..75e8005 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -81,6 +81,13 @@ object JobConfig {
val PROCESSOR_ID = "processor.id"
val PROCESSOR_LIST = "processor.list"
+ // Represents the store path for non-changelog stores.
+ val JOB_NON_LOGGED_STORE_BASE_DIR = "job.non-logged.store.base.dir"
+
+ // Represents the store path for stores with changelog enabled. Typically the stores are not cleaned up
+ // across application restarts
+ val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir"
+
implicit def Config2Job(config: Config) = new JobConfig(config)
/**
@@ -175,4 +182,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
}
def getDebounceTimeMs = getInt(JobConfig.JOB_DEBOUNCE_TIME_MS, JobConfig.DEFAULT_DEBOUNCE_TIME_MS)
+
+ def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR)
+
+ def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 7aec8e1..ad5cb9a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -76,6 +76,47 @@ object SamzaContainer extends Logging {
classOf[JobModel])
}
+ // TODO: SAMZA-1701 SamzaContainer should not contain any logic related to store directories
+ def getNonLoggedStorageBaseDir(config: Config, defaultStoreBaseDir: File) = {
+ config.getNonLoggedStorePath match {
+ case Some(nonLoggedStorePath) =>
+ new File(nonLoggedStorePath)
+ case None =>
+ defaultStoreBaseDir
+ }
+ }
+
+ // TODO: SAMZA-1701 SamzaContainer should not contain any logic related to store directories
+ def getLoggedStorageBaseDir(config: Config, defaultStoreBaseDir: File) = {
+ val defaultLoggedStorageBaseDir = config.getLoggedStorePath match {
+ case Some(durableStorePath) =>
+ new File(durableStorePath)
+ case None =>
+ defaultStoreBaseDir
+ }
+
+ var loggedStorageBaseDir:File = null
+ if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
+ val jobNameAndId = (
+ config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")),
+ config.getJobId.getOrElse("1")
+ )
+
+ loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)
+ + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
+ } else {
+ if (config.getLoggedStorePath.isEmpty) {
+ warn("No override was provided for logged store base directory. This disables local state re-use on " +
+ "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " +
+ "variable in all machines running the Samza container or configure job.logged.store.base.dir for your application")
+ }
+
+ loggedStorageBaseDir = defaultLoggedStorageBaseDir
+ }
+
+ loggedStorageBaseDir
+ }
+
def apply(
containerId: String,
jobModel: JobModel,
@@ -431,10 +472,6 @@ object SamzaContainer extends Logging {
.toSet
val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry)
- // TODO not sure how we should make this config based, or not. Kind of
- // strange, since it has some dynamic directories when used with YARN.
- val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
- info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
val storeWatchPaths = new util.HashSet[Path]()
@@ -468,21 +505,13 @@ object SamzaContainer extends Logging {
info("Got store consumers: %s" format storeConsumers)
- var loggedStorageBaseDir: File = null
- if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
- val jobNameAndId = (
- config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")),
- config.getJobId.getOrElse("1")
- )
- loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)
- + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
- } else {
- warn("No override was provided for logged store base directory. This disables local state re-use on " +
- "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " +
- "variable in all machines running the Samza container")
- loggedStorageBaseDir = defaultStoreBaseDir
- }
+ val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+ info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
+
+ val nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(config, defaultStoreBaseDir)
+ info("Got base directory for non logged data stores: %s" format nonLoggedStorageBaseDir)
+ var loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir)
info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
val taskStores = storageEngineFactories
@@ -509,7 +538,7 @@ object SamzaContainer extends Logging {
val storeDir = if (changeLogSystemStreamPartition != null) {
TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
} else {
- TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName)
+ TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName)
}
storeWatchPaths.add(storeDir.toPath)
@@ -535,7 +564,7 @@ object SamzaContainer extends Logging {
changeLogSystemStreams = changeLogSystemStreams,
maxChangeLogStreamPartitions,
streamMetadataCache = streamMetadataCache,
- storeBaseDir = defaultStoreBaseDir,
+ nonLoggedStoreBaseDir = nonLoggedStorageBaseDir,
loggedStoreBaseDir = loggedStorageBaseDir,
partition = taskModel.getChangelogPartition,
systemAdmins = systemAdmins,
http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 00dc20f..09744cf 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -51,7 +51,7 @@ class TaskStorageManager(
changeLogSystemStreams: Map[String, SystemStream] = Map(),
changeLogStreamPartitions: Int,
streamMetadataCache: StreamMetadataCache,
- storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
+ nonLoggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
partition: Partition,
systemAdmins: SystemAdmins,
@@ -84,12 +84,12 @@ class TaskStorageManager(
debug("Cleaning base directories for stores.")
taskStores.keys.foreach(storeName => {
- val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
- info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString)
+ val nonLoggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(nonLoggedStoreBaseDir, storeName, taskName)
+ info("Got non logged storage partition directory as %s" format nonLoggedStorePartitionDir.toPath.toString)
- if(storePartitionDir.exists()) {
- info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString)
- FileUtil.rm(storePartitionDir)
+ if(nonLoggedStorePartitionDir.exists()) {
+ info("Deleting non logged storage partition directory %s" format nonLoggedStorePartitionDir.toPath.toString)
+ FileUtil.rm(nonLoggedStorePartitionDir)
}
val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
@@ -179,9 +179,9 @@ class TaskStorageManager(
info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName))
if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs()
} else {
- val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
- info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName))
- storePartitionDir.mkdirs()
+ val nonLoggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(nonLoggedStoreBaseDir, storeName, taskName)
+ info("Using non logged storage partition directory: %s for store: %s." format(nonLoggedStorePartitionDir.toPath.toString, storeName))
+ nonLoggedStorePartitionDir.mkdirs()
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index bbdb819..31d3ef6 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -715,7 +715,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
changeLogSystemStreams = changeLogSystemStreams,
changeLogStreamPartitions = changeLogStreamPartitions,
streamMetadataCache = streamMetadataCache,
- storeBaseDir = storeBaseDir,
+ nonLoggedStoreBaseDir = storeBaseDir,
loggedStoreBaseDir = loggedStoreBaseDir,
partition = partition,
systemAdmins = new SystemAdmins(systemAdmins.asJava),
http://git-wip-us.apache.org/repos/asf/samza/blob/164fa5f0/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
----------------------------------------------------------------------
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
index 8413194..11806f3 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -32,6 +33,7 @@ public class LocalStoreMonitorConfig extends MapConfig {
/**
* Defines the local store directory of the job.
+ * @deprecated in favor of {@link org.apache.samza.config.JobConfig#JOB_LOGGED_STORE_BASE_DIR}
*/
static final String CONFIG_LOCAL_STORE_DIR = "job.local.store.dir";
@@ -68,7 +70,7 @@ public class LocalStoreMonitorConfig extends MapConfig {
* @return the location of the job's local directory.
*/
public String getLocalStoreBaseDir() {
- return get(CONFIG_LOCAL_STORE_DIR);
+ return get(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), get(CONFIG_LOCAL_STORE_DIR));
}
/**