You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2019/07/19 22:14:10 UTC

[samza] branch master updated: SAMZA-2274: Cleanup store directories on startup for non-logged store only

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ea235  SAMZA-2274: Cleanup store directories on startup for non-logged store only
e0ea235 is described below

commit e0ea235155ac5b619f0c5c67d6e86dc45363bda5
Author: mynameborat <bh...@gmail.com>
AuthorDate: Fri Jul 19 15:13:56 2019 -0700

    SAMZA-2274: Cleanup store directories on startup for non-logged store only
    
    During startup, the CSM cleans up the base directories for stores. Ideally, this cleanup should only be done for non-logged stores. In our current setup, if the applications configure both logged and non-logged store base directories to be the same, we delete the logged store directories as well which results in unnecessary bootstrap.
    
    In this PR,
     * We modify the clean up behavior on startup to impact only non-logged stores
     * Update the configuration documentation to reflect potential implications of not configuring `job.logged.store.base.dir` and `job.non.logged.store.base.dir` for stateful applications.
     * Add a warning if both logged and non-logged store base directories are configured to use same path.
    
    Author: mynameborat <bh...@gmail.com>
    
    Reviewers: Hai Lu <ha...@linkedin.com>
    
    Closes #1107 from mynameborat/SAMZA-2274 and squashes the following commits:
    
    7d47a90b [mynameborat] Fix the logged store condition check
    35b1bb3d [mynameborat] Cleanup store directories on startup for non-logged store only
---
 .../versioned/jobs/samza-configurations.md         |  6 +--
 .../samza/storage/ContainerStorageManager.java     | 54 +++++++++++++---------
 2 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 3f12594..d24efc2 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -245,7 +245,7 @@ Configs for producing to [ElasticSearch](https://www.elastic.co/products/elastic
 |systems.**_system-name_**.<br>bulk.flush.interval.ms|never|How often buffered messages should be flushed.|
 
 ### <a name="state-storage"></a>[4. State Storage](#state-storage)
-These properties define Samza's storage mechanism for efficient [stateful stream processing](../container/state-management.html).
+These properties define Samza's storage mechanism for efficient [stateful stream processing](../container/state-management.html). Stateful applications should configure base directories for durable and non-durable stores using `job.logged.store.base.dir` and `job.non.logged.store.base.dir` respectively.
 
 |Name|Default|Description|
 |--- |--- |--- |
@@ -254,12 +254,12 @@ These properties define Samza's storage mechanism for efficient [stateful stream
 |stores.**_store-name_**.msg.serde| |If the storage engine expects values in the store to be simple byte arrays, this [serde](../container/serialization.html) allows the stream task to access the store using another object type as value. The value of this property must be a serde-name that is registered with serializers.registry.*.class. If this property is not set, values are passed unmodified to the storage engine (and the changelog stream, if appropriate).|
 |stores.**_store-name_**.changelog| |Samza stores are local to a container. If the container fails, the contents of the store are lost. To prevent loss of data, you need to set this property to configure a changelog stream: Samza then ensures that writes to the store are replicated to this stream, and the store is restored from this stream after a failure. The value of this property is given in the form system-name.stream-name. The "system-name" part is optional. If it is omitted you mus [...]
 |stores.**_store-name_**.rocksdb.ttl.ms| |__For RocksDB:__ The time-to-live of the store. Please note it's not a strict TTL limit (removed only after compaction). Please use caution opening a database with and without TTL, as it might corrupt the database. Please make sure to read the [constraints](https://github.com/facebook/rocksdb/wiki/Time-to-Live) before using.|
+|job.logged.store.base.dir|_user.dir_ environment property if set, else current working directory of the process|The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable `LOGGED_STORE_BASE_DIR`. __Note:__ The environment variable takes precedence over `job.logged.store.base.dir`. <br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity shoul [...]
+|job.non-logged.store.base.dir|_user.dir_ environment property if set, else current working directory of the process|The base directory for non-changelog stores used by Samza application. <br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory. This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config `yarn.nodemanager.delete. [...]
 
 ##### <a name="advanced-storage-configurations"></a>[4.1 Advanced Storage Configurations](#advanced-storage-configurations)
 |Name|Default|Description|
 |--- |--- |--- |
-|job.logged.store.base.dir|_user.dir_ environment property if set, else current working directory of the process|The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable `LOGGED_STORE_BASE_DIR`. __Note:__ The environment variable takes precedence over `job.logged.store.base.dir`. <br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity shoul [...]
-|job.non-logged.store.base.dir|_user.dir_ environment property if set, else current working directory of the process|The base directory for non-changelog stores used by Samza application. <br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory. This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config `yarn.nodemanager.delete. [...]
 |stores.default.changelog.<br>replication.factor|2|This property defines the default number of replicas to use for the change log stream.|
 |stores.**_store-name_**.changelog.<br>replication.factor|stores.default.changelog.<br>replication.factor|The property defines the number of replicas to use for the change log stream.|
 |stores.**_store-name_**.changelog.<br>kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.|
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index d3fa6ec..a1a2f05 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -203,6 +203,12 @@ public class ContainerStorageManager {
     this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
     this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
 
+    if (loggedStoreBaseDirectory != null && loggedStoreBaseDirectory.equals(nonLoggedStoreBaseDirectory)) {
+      LOG.warn("Logged and non-logged store base directory are configured to same path: {}. It is recommended to configure"
+          + "them separately to ensure clean up of non-logged store data doesn't accidentally impact logged store data.",
+          loggedStoreBaseDirectory);
+    }
+
     // set the config
     this.config = config;
 
@@ -955,33 +961,35 @@ public class ContainerStorageManager {
     private void cleanBaseDirsAndReadOffsetFiles() {
       LOG.debug("Cleaning base directories for stores.");
 
-      taskStores.keySet().forEach(storeName -> {
-          File nonLoggedStorePartitionDir =
-              StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
-          LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString());
-
-          if (nonLoggedStorePartitionDir.exists()) {
-            LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString());
-            FileUtil.rm(nonLoggedStorePartitionDir);
-          }
-
-          File loggedStorePartitionDir =
-              StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
-          LOG.info("Got logged storage partition directory as " + loggedStorePartitionDir.toPath().toString());
+      taskStores.forEach((storeName, storageEngine) -> {
+          if (!storageEngine.getStoreProperties().isLoggedStore()) {
+            File nonLoggedStorePartitionDir =
+                StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+            LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString());
 
-          // Delete the logged store if it is not valid.
-          if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
-            LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
-            FileUtil.rm(loggedStorePartitionDir);
+            if (nonLoggedStorePartitionDir.exists()) {
+              LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString());
+              FileUtil.rm(nonLoggedStorePartitionDir);
+            }
           } else {
+            File loggedStorePartitionDir =
+                StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+            LOG.info("Got logged storage partition directory as " + loggedStorePartitionDir.toPath().toString());
+
+            // Delete the logged store if it is not valid.
+            if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
+              LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
+              FileUtil.rm(loggedStorePartitionDir);
+            } else {
 
-            SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
-            Map<SystemStreamPartition, String> offset =
-                StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP), false);
-            LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
+              SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+              Map<SystemStreamPartition, String> offset =
+                  StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP), false);
+              LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
 
-            if (offset.containsKey(changelogSSP)) {
-              fileOffsets.put(changelogSSP, offset.get(changelogSSP));
+              if (offset.containsKey(changelogSSP)) {
+                fileOffsets.put(changelogSSP, offset.get(changelogSSP));
+              }
             }
           }
         });