You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/02 01:57:59 UTC

[samza] branch master updated: Transactional State [3/5]: Made StorageManagerUtil and FileUtil non-static for mocking side effects in tests

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a4d8014  Transactional State [3/5]: Made StorageManagerUtil and FileUtil non-static for mocking side effects in tests
a4d8014 is described below

commit a4d8014108e53b8e5bc602ddc7dcd90ade801635
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 1 18:57:55 2019 -0700

    Transactional State [3/5]: Made StorageManagerUtil and FileUtil non-static for mocking side effects in tests
    
    This PR changes StorageManagerUtil and FileUtil method to instance methods instead of static methods. Since many of these methods have side effects (modify disk contents), this helps mock their behavior in unit tests without performing actual I/O or using PowerMock.
---
 .../apache/samza/storage/StorageManagerUtil.java   | 55 +++++++++--------
 .../samza/storage/TaskSideInputStorageManager.java | 17 +++---
 .../org/apache/samza/util/DiagnosticsUtil.java     |  2 +-
 .../samza/storage/ContainerStorageManager.java     | 33 ++++++-----
 .../apache/samza/storage/TaskStorageManager.scala  |  6 +-
 .../scala/org/apache/samza/util/FileUtil.scala     | 22 +++++--
 .../samza/storage/TestTaskStorageManager.scala     | 68 +++++++++++-----------
 .../scala/org/apache/samza/util/TestFileUtil.scala | 15 ++---
 .../samza/storage/kv/RocksDbOptionsHelper.java     |  3 +-
 .../apache/samza/monitor/LocalStoreMonitor.java    |  4 +-
 .../apache/samza/test/framework/TestRunner.java    |  2 +-
 .../test/performance/TestKeyValuePerformance.scala |  2 +-
 12 files changed, 125 insertions(+), 104 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index a3f9712..604f7d1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -67,7 +67,7 @@ public class StorageManagerUtil {
    * @param oldestOffset oldest offset for the ssp from the source
    * @return starting offset for the incoming {@link SystemStreamPartition}
    */
-  public static String getStartingOffset(
+  public String getStartingOffset(
       SystemStreamPartition ssp, SystemAdmin admin, String fileOffset, String oldestOffset) {
     String startingOffset = oldestOffset;
     if (fileOffset != null) {
@@ -97,7 +97,7 @@ public class StorageManagerUtil {
    * @param isSideInput true if store is a side-input store, false if it is a regular store
    * @return true if the store is stale, false otherwise
    */
-  public static boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs, boolean isSideInput) {
+  public boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs, boolean isSideInput) {
     long offsetFileLastModifiedTime;
     boolean isStaleStore = false;
     String storePath = storeDir.toPath().toString();
@@ -137,20 +137,20 @@ public class StorageManagerUtil {
    * An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty.
    *
    * @param storeDir the base directory of the store
-   * @param storeSSPs storeSSPs (if any) associated with the store
+   * @param storeSSPs ssps (if any) associated with the store
    * @param isSideInput true if store is a side-input store, false if it is a regular store
    * @return true if the offset file is valid. false otherwise.
    */
-  public static boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
+  public boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
     boolean hasValidOffsetFile = false;
     if (storeDir.exists()) {
       Map<SystemStreamPartition, String> offsetContents = readOffsetFile(storeDir, storeSSPs, isSideInput);
       if (offsetContents == null) {
-        LOG.info("Offset file does not exist. Store directory: {}", storeDir.toPath());
+        LOG.info("Offset file is invalid since it does not exist. Store directory: {}", storeDir.toPath());
       } else if (offsetContents.isEmpty()) {
-        LOG.info("Offset file is empty. Store directory: {}", storeDir.toPath());
+        LOG.info("Offset file is invalid since it is empty. Store directory: {}", storeDir.toPath());
       } else if (!offsetContents.keySet().equals(storeSSPs)) {
-        LOG.info("Offset file is invalid since change-log SSPs don't match. "
+        LOG.info("Offset file is invalid since changelog or side input SSPs don't match. "
             + "Store directory: {}. SSPs from offset-file: {} SSPs expected: {} ",
             storeDir.toPath(), offsetContents.keySet(), storeSSPs);
       } else {
@@ -163,29 +163,27 @@ public class StorageManagerUtil {
 
   /**
    * Write the given SSP-Offset map into the offsets file.
-   * @param storeBaseDir the base directory of the store
-   * @param storeName the store name to use
-   * @param taskName the task name which is referencing the store
+   * @param storeDir the directory of the store
    * @param offsets The SSP-offset to write
    * @param isSideInput true if store is a side-input store, false if it is a regular store
    * @throws IOException because of deserializing to json
    */
-  public static void writeOffsetFile(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode,
-      Map<SystemStreamPartition, String> offsets, boolean isSideInput) throws IOException {
+  public void writeOffsetFile(File storeDir, Map<SystemStreamPartition, String> offsets, boolean isSideInput) throws IOException {
 
     // First, we write the new-format offset file
-    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName, taskMode), OFFSET_FILE_NAME_NEW);
+    File offsetFile = new File(storeDir, OFFSET_FILE_NAME_NEW);
     String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
-    FileUtil.writeWithChecksum(offsetFile, fileContents);
+    FileUtil fileUtil = new FileUtil();
+    fileUtil.writeWithChecksum(offsetFile, fileContents);
 
     // Now we write the old format offset file, which are different for store-offset and side-inputs
     if (isSideInput) {
-      offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName, taskMode), SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
       fileContents = OBJECT_WRITER.writeValueAsString(offsets);
-      FileUtil.writeWithChecksum(offsetFile, fileContents);
+      fileUtil.writeWithChecksum(offsetFile, fileContents);
     } else {
-      offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName, taskMode), OFFSET_FILE_NAME_LEGACY);
-      FileUtil.writeWithChecksum(offsetFile, offsets.entrySet().iterator().next().getValue());
+      offsetFile = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
+      fileUtil.writeWithChecksum(offsetFile, offsets.entrySet().iterator().next().getValue());
     }
   }
 
@@ -195,7 +193,7 @@ public class StorageManagerUtil {
    * @param storeName the store name to use
    * @param taskName the task name which is referencing the store
    */
-  public static void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName) {
+  public void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName) {
     deleteOffsetFile(storeBaseDir, storeName, taskName, OFFSET_FILE_NAME_NEW);
     deleteOffsetFile(storeBaseDir, storeName, taskName, OFFSET_FILE_NAME_LEGACY);
   }
@@ -203,10 +201,10 @@ public class StorageManagerUtil {
   /**
    * Delete the given offsetFile for the store if it exists.
    */
-  private static void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName, String offsetFileName) {
-    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName, TaskMode.Active), offsetFileName);
+  private void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName, String offsetFileName) {
+    File offsetFile = new File(getTaskStoreDir(storeBaseDir, storeName, taskName, TaskMode.Active), offsetFileName);
     if (offsetFile.exists()) {
-      FileUtil.rm(offsetFile);
+      new FileUtil().rm(offsetFile);
     }
   }
 
@@ -216,7 +214,7 @@ public class StorageManagerUtil {
    * @param storeDir the base directory of the store
    * @return true if a non-empty storeDir exists, false otherwise
    */
-  public static boolean storeExists(File storeDir) {
+  public boolean storeExists(File storeDir) {
     return storeDir.exists() && storeDir.list().length > 0;
   }
 
@@ -228,7 +226,7 @@ public class StorageManagerUtil {
    * @param isSideInput, true if the store is a side-input store, false otherwise
    * @return the content of the offset file if it exists for the store, null otherwise.
    */
-  public static Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
+  public Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, Set<SystemStreamPartition> storeSSPs, boolean isSideInput) {
 
     File offsetFileRefNew = new File(storagePartitionDir, OFFSET_FILE_NAME_NEW);
     File offsetFileRefLegacy = new File(storagePartitionDir, OFFSET_FILE_NAME_LEGACY);
@@ -247,7 +245,6 @@ public class StorageManagerUtil {
     } else {
       return new HashMap<>();
     }
-
   }
 
   /**
@@ -258,16 +255,16 @@ public class StorageManagerUtil {
    * @param storeSSPs SSPs associated with the store (if any)
    * @return the content of the offset file if it exists for the store, null otherwise.
    */
-  private static Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, String offsetFileName, Set<SystemStreamPartition> storeSSPs) {
+  private Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, String offsetFileName, Set<SystemStreamPartition> storeSSPs) {
     Map<SystemStreamPartition, String> offsets = new HashMap<>();
     String fileContents = null;
     File offsetFileRef = new File(storagePartitionDir, offsetFileName);
     String storePath = storagePartitionDir.getPath();
 
     if (offsetFileRef.exists()) {
-      LOG.info("Found offset file in storage partition directory: {}", storePath);
+      LOG.debug("Found offset file in storage partition directory: {}", storePath);
       try {
-        fileContents = FileUtil.readWithChecksum(offsetFileRef);
+        fileContents = new FileUtil().readWithChecksum(offsetFileRef);
         offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
       } catch (JsonParseException | JsonMappingException e) {
         LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), offsetFileName);
@@ -293,7 +290,7 @@ public class StorageManagerUtil {
    * @param taskMode the mode of the given task
    * @return the partition directory for the store
    */
-  public static File getStorePartitionDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
+  public File getTaskStoreDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
     TaskName taskNameForDirName = taskName;
     if (taskMode.equals(TaskMode.Standby)) {
       taskNameForDirName =  StandbyTaskUtil.getActiveTaskName(taskName);
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 1bdd6b7..e2cfe1d 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -72,6 +72,7 @@ public class TaskSideInputStorageManager {
   private final TaskName taskName;
   private final TaskMode taskMode;
   private final Map<SystemStreamPartition, String> lastProcessedOffsets = new ConcurrentHashMap<>();
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
 
   private Map<SystemStreamPartition, String> startingOffsets;
 
@@ -244,7 +245,7 @@ public class TaskSideInputStorageManager {
         String storePath = storeLocation.toPath().toString();
         if (!isValidSideInputStore(storeName, storeLocation)) {
           LOG.info("Cleaning up the store directory at {} for {}", storePath, storeName);
-          FileUtil.rm(storeLocation);
+          new FileUtil().rm(storeLocation);
         }
 
         if (isPersistedStore(storeName) && !storeLocation.exists()) {
@@ -269,7 +270,8 @@ public class TaskSideInputStorageManager {
               .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
 
             try {
-              StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, taskName, taskMode, offsets, true);
+              File taskStoreDir = storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode);
+              storageManagerUtil.writeOffsetFile(taskStoreDir, offsets, true);
             } catch (Exception e) {
               throw new SamzaException("Failed to write offset file for side input store: " + storeName, e);
             }
@@ -294,7 +296,8 @@ public class TaskSideInputStorageManager {
         if (isValidSideInputStore(storeName, storeLocation)) {
           try {
 
-            Map<SystemStreamPartition, String> offsets = StorageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName), true);
+            Map<SystemStreamPartition, String> offsets =
+                storageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName), true);
             fileOffsets.putAll(offsets);
           } catch (Exception e) {
             LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
@@ -307,7 +310,7 @@ public class TaskSideInputStorageManager {
 
   @VisibleForTesting
   File getStoreLocation(String storeName) {
-    return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName, taskName, taskMode);
+    return storageManagerUtil.getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode);
   }
 
   /**
@@ -329,7 +332,7 @@ public class TaskSideInputStorageManager {
         String oldestOffset = oldestOffsets.get(ssp);
 
         startingOffsets.put(ssp,
-          StorageManagerUtil.getStartingOffset(
+          storageManagerUtil.getStartingOffset(
             ssp, systemAdmins.getSystemAdmin(ssp.getSystem()), fileOffset, oldestOffset));
       });
 
@@ -377,8 +380,8 @@ public class TaskSideInputStorageManager {
 
   private boolean isValidSideInputStore(String storeName, File storeLocation) {
     return isPersistedStore(storeName)
-        && !StorageManagerUtil.isStaleStore(storeLocation, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis(), true)
-        && StorageManagerUtil.isOffsetFileValid(storeLocation, storeToSSps.get(storeName), true);
+        && !storageManagerUtil.isStaleStore(storeLocation, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis(), true)
+        && storageManagerUtil.isOffsetFileValid(storeLocation, storeToSSps.get(storeName), true);
   }
 
   private boolean isPersistedStore(String storeName) {
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 2870153..2f5d74b 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -80,7 +80,7 @@ public class DiagnosticsUtil {
       MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics());
       MetadataFileContents metadataFileContents =
           new MetadataFileContents("1", new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
-      FileUtil.writeToTextFile(metadataFile.get(), new String(new JsonSerde<>().toBytes(metadataFileContents)), false);
+      new FileUtil().writeToTextFile(metadataFile.get(), new String(new JsonSerde<>().toBytes(metadataFileContents)), false);
     } else {
       log.info("Skipping writing metadata file.");
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index d4f7424..58ec045 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -168,6 +168,7 @@ public class ContainerStorageManager {
   private volatile Throwable sideInputException = null;
 
   private final Config config;
+  private final StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
 
   public ContainerStorageManager(ContainerModel containerModel,
       StreamMetadataCache streamMetadataCache,
@@ -470,9 +471,11 @@ public class ContainerStorageManager {
     // for non logged stores
     File storeDirectory;
     if (changeLogSystemStreamPartition != null || sideInputSystemStreams.containsKey(storeName)) {
-      storeDirectory = StorageManagerUtil.getStorePartitionDir(this.loggedStoreBaseDirectory, storeName, taskName, taskModel.getTaskMode());
+      storeDirectory = storageManagerUtil.getTaskStoreDir(this.loggedStoreBaseDirectory, storeName, taskName,
+          taskModel.getTaskMode());
     } else {
-      storeDirectory = StorageManagerUtil.getStorePartitionDir(this.nonLoggedStoreBaseDirectory, storeName, taskName, taskModel.getTaskMode());
+      storeDirectory = storageManagerUtil.getTaskStoreDir(this.nonLoggedStoreBaseDirectory, storeName, taskName,
+          taskModel.getTaskMode());
     }
 
     this.storeDirectoryPaths.add(storeDirectory.toPath());
@@ -957,31 +960,33 @@ public class ContainerStorageManager {
      */
     private void cleanBaseDirsAndReadOffsetFiles() {
       LOG.debug("Cleaning base directories for stores.");
-
+      FileUtil fileUtil = new FileUtil();
       taskStores.forEach((storeName, storageEngine) -> {
           if (!storageEngine.getStoreProperties().isLoggedStore()) {
             File nonLoggedStorePartitionDir =
-                StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+                storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(),
+                    taskModel.getTaskMode());
             LOG.info("Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString());
 
             if (nonLoggedStorePartitionDir.exists()) {
               LOG.info("Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString());
-              FileUtil.rm(nonLoggedStorePartitionDir);
+              fileUtil.rm(nonLoggedStorePartitionDir);
             }
           } else {
             File loggedStorePartitionDir =
-                StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+                storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(),
+                    taskModel.getTaskMode());
             LOG.info("Got logged storage partition directory as " + loggedStorePartitionDir.toPath().toString());
 
             // Delete the logged store if it is not valid.
             if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
               LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
-              FileUtil.rm(loggedStorePartitionDir);
+              fileUtil.rm(loggedStorePartitionDir);
             } else {
 
               SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
               Map<SystemStreamPartition, String> offset =
-                  StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP), false);
+                  storageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP), false);
               LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
 
               if (offset.containsKey(changelogSSP)) {
@@ -1003,11 +1008,11 @@ public class ContainerStorageManager {
      */
     private boolean isLoggedStoreValid(String storeName, File loggedStoreDir) {
       long changeLogDeleteRetentionInMs = new StorageConfig(config).getChangeLogDeleteRetentionInMs(storeName);
-
       if (changelogSystemStreams.containsKey(storeName)) {
         SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
-        return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() && StorageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP), false)
-            && !StorageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false);
+        return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk()
+            && storageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP), false)
+            && !storageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false);
       }
 
       return false;
@@ -1022,7 +1027,7 @@ public class ContainerStorageManager {
           if (storageEngine.getStoreProperties().isLoggedStore()) {
 
             File loggedStorePartitionDir =
-                StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+                storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
 
             LOG.info("Using logged storage partition directory: " + loggedStorePartitionDir.toPath().toString()
                 + " for store: " + storeName);
@@ -1032,7 +1037,7 @@ public class ContainerStorageManager {
             }
           } else {
             File nonLoggedStorePartitionDir =
-                StorageManagerUtil.getStorePartitionDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+                storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
             LOG.info("Using non logged storage partition directory: " + nonLoggedStorePartitionDir.toPath().toString()
                 + " for store: " + storeName);
             nonLoggedStorePartitionDir.mkdirs();
@@ -1140,7 +1145,7 @@ public class ContainerStorageManager {
       }
 
       String oldestOffset = changeLogOldestOffsets.get(systemStreamPartition.getSystemStream());
-      return StorageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, fileOffset, oldestOffset);
+      return storageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, fileOffset, oldestOffset);
     }
 
 
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 046d9a6..be089c6 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -89,15 +89,17 @@ class TaskStorageManager(
         val newestOffset = if (sspMetadata == null) null else sspMetadata.getNewestOffset
         debug("Got offset %s for store %s" format(newestOffset, storeName))
 
+        val storageManagerUtil = new StorageManagerUtil()
         if (newestOffset != null) {
           debug("Storing offset for store in OFFSET file ")
 
           // TaskStorageManagers are only spun-up for active tasks
-          StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, taskName, TaskMode.Active, Map(ssp -> newestOffset).asJava, false)
+          val currentStoreDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, storeName, taskName, TaskMode.Active)
+          storageManagerUtil.writeOffsetFile(currentStoreDir, Map(ssp -> newestOffset).asJava, false)
           debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, storeName))
         } else {
           //if newestOffset is null, then it means the store is (or has become) empty. No need to persist the offset file
-          StorageManagerUtil.deleteOffsetFile(loggedStoreBaseDir, storeName, taskName);
+          storageManagerUtil.deleteOffsetFile(loggedStoreBaseDir, storeName, taskName);
           debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
         }
       } catch {
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
index 98b1447..f45f28e 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
@@ -27,7 +27,7 @@ import java.util.zip.CRC32
 
 import org.apache.samza.util.Util.info
 
-object FileUtil extends Logging {
+class FileUtil extends Logging {
   /**
     * Writes checksum & data to a file
     * Checksum is pre-fixed to the data and is a 32-bit long type data.
@@ -51,7 +51,7 @@ object FileUtil extends Logging {
     }
 
     //atomic swap of tmp and real offset file
-    swapFiles(tmpFile, file)
+    move(tmpFile, file)
   }
 
   /**
@@ -68,7 +68,7 @@ object FileUtil extends Logging {
 
     //atomic swap of tmp and real file if we need to append
     if (append) {
-      swapFiles(file, tmpFile)
+      move(file, tmpFile)
     }
 
     try {
@@ -83,11 +83,13 @@ object FileUtil extends Logging {
     }
 
     //atomic swap of tmp and real file
-    swapFiles(tmpFile, file)
+    move(tmpFile, file)
   }
 
-  private def swapFiles(source: File, destination: File) : Unit = {
-    //atomic swap of source and destination file
+  /**
+   * Moves source file to destination file, replacing destination file if it already exists.
+   */
+  def move(source: File, destination: File) : Unit = {
     try {
       if (source.exists()) {
         Files.move(source.toPath, destination.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
@@ -141,6 +143,14 @@ object FileUtil extends Logging {
     }
   }
 
+  def exists(path: Path): Boolean = {
+    Files.exists(path)
+  }
+
+  def createDirectories(path: Path): Path = {
+    Files.createDirectories(path)
+  }
+
   /**
     * Generates the CRC32 checksum code for any given data
     * @param data The string for which checksum has to be generated
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 84b4b3f..5451854 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.storage
 
-
 import java.io.{File, FileOutputStream, ObjectOutputStream}
 import java.util
 
@@ -62,19 +61,21 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   val store = "store1"
   val loggedStore = "loggedStore1"
   val taskName = new TaskName("testTask")
+  val storageManagerUtil = new StorageManagerUtil
+  val fileUtil = new FileUtil
 
   @Before
   def setupTestDirs() {
-    StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName, TaskMode.Active)
+    storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName, TaskMode.Active)
       .mkdirs()
-    StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
       .mkdirs()
   }
 
   @After
   def tearDownTestDirs() {
-    FileUtil.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
-    FileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
+    fileUtil.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
+    fileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
   }
 
   def getStreamName(storeName : String): String = {
@@ -92,7 +93,8 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
     val ss = new SystemStream("kafka", getStreamName(loggedStore))
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, 
+      loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
     val offsetFile = new File(storeDirectory, offsetFileName)
 
@@ -182,7 +184,7 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
     val ss = new SystemStream("kafka", getStreamName(store))
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active)
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active)
 
     val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
 
@@ -255,9 +257,9 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
 
   @Test
   def testStoreDirsWithoutOffsetFileAreDeletedInCleanBaseDirs() {
-    val checkFilePath1 = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName, TaskMode.Active), "check")
+    val checkFilePath1 = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName, TaskMode.Active), "check")
     checkFilePath1.createNewFile()
-    val checkFilePath2 = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), "check")
+    val checkFilePath2 = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), "check")
     checkFilePath2.createNewFile()
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -273,8 +275,8 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
 
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
-    FileUtil.writeWithChecksum(offsetFilePath, "100")
+    val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
+    fileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addLoggedStore(loggedStore, true)
@@ -289,11 +291,11 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   def testStoreDeletedWhenOffsetFileOlderThanDeleteRetention() {
     // This test ensures that store gets deleted when lastModifiedTime of the offset file
     // is older than deletionRetention of the changeLog.
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     storeDirectory.setLastModified(0)
     val offsetFile = new File(storeDirectory, offsetFileName)
     offsetFile.createNewFile()
-    FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
+    fileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
 
     val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
@@ -308,8 +310,8 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
 
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
-    FileUtil.writeWithChecksum(offsetFilePath, "100")
+    val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
+    fileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addLoggedStore(loggedStore, false)
@@ -324,7 +326,7 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   def testStopCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     val offsetFile = new File(storeDirectory, offsetFileName)
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -365,9 +367,9 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   def testFlushCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
+    val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
     val anotherOffsetPath = new File(
-      StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + offsetFileName)
+      storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + offsetFileName)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -403,7 +405,7 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
+    val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
@@ -450,8 +452,8 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
-    FileUtil.writeWithChecksum(offsetFilePath, "100")
+    val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
+    fileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "139", "140")
@@ -503,9 +505,9 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   private def validateOffsetFileContents(offsetFile: File, ssp: String, offset: String): Unit = {
 
     if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_NEW)) {
-      assertEquals("Found incorrect value in offset file!", "{\"" + ssp + "\":\"" + offset + "\"}", FileUtil.readWithChecksum(offsetFile))
+      assertEquals("Found incorrect value in offset file!", "{\"" + ssp + "\":\"" + offset + "\"}", fileUtil.readWithChecksum(offsetFile))
     } else if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY)) {
-      assertEquals("Found incorrect value in offset file!", offset, FileUtil.readWithChecksum(offsetFile))
+      assertEquals("Found incorrect value in offset file!", offset, fileUtil.readWithChecksum(offsetFile))
     } else {
       throw new IllegalArgumentException("Invalid offset file name");
     }
@@ -515,7 +517,7 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
+    val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
 
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -593,28 +595,28 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
   @Test
   def testReadOfOldOffsetFormat(): Unit = {
     // Create a file in old single-offset format, with a sample offset
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
     val offsetFile = new File(storeDirectory, offsetFileName)
     val sampleOldOffset = "912321"
-    FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
+    fileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
 
 
     // read offset against a given ssp from the file
     var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
-    val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)
+    val offsets = storageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)
     assertTrue(offsets.get(ssp).equals(sampleOldOffset))
   }
 
   @Test
   def testReadOfOffsetInCaseOfBothFilesPresent(): Unit = {
     // Create a file in old single-offset format, with a sample offset, and another with the new-offset format
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
     val sampleOldOffset = "100000001"
     val sampleNewOffset = "{\"kafka.test-stream.0\":\"200000002\"}"
-    FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY), sampleOldOffset)
-    FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW), sampleNewOffset)
+    fileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY), sampleOldOffset)
+    fileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW), sampleNewOffset)
 
     // Ensure that the files exist
     assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY).exists())
@@ -622,7 +624,7 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
 
     // read offset against a given ssp from the file, and check that the one in the new file should be read
     var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
-    val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)
+    val offsets = storageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)
 
     assertEquals(1, offsets.size())
     assertEquals("200000002", offsets.get(ssp))
@@ -636,13 +638,13 @@ class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
     val ss = new SystemStream(systemName, streamName)
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition(ss, partition)
-    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active  )
+    val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active  )
     val storeFile = new File(storeDirectory, "store.sst")
 
     if (writeOffsetFile) {
       val offsetFile = new File(storeDirectory, offsetFileName)
       if (fileOffset != null) {
-        FileUtil.writeWithChecksum(offsetFile, fileOffset)
+        fileUtil.writeWithChecksum(offsetFile, fileOffset)
       } else {
         // Write garbage to produce a null result when it's read
         val fos = new FileOutputStream(offsetFile)
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
index ddda268..6604ae0 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
@@ -28,13 +28,14 @@ import org.junit.Test
 
 class TestFileUtil {
   val data = "100"
-  val checksum = FileUtil.getChecksum(data)
+  val fileUtil = new FileUtil()
+  val checksum = fileUtil.getChecksum(data)
   val file = new File(System.getProperty("java.io.tmpdir"), "test")
 
   @Test
   def testWriteDataToFile() {
     // Invoke test
-    FileUtil.writeWithChecksum(file, data)
+    fileUtil.writeWithChecksum(file, data)
 
     // Check that file exists
     assertTrue("File was not created!", file.exists())
@@ -53,9 +54,9 @@ class TestFileUtil {
     // Invoke test
     val file = new File(System.getProperty("java.io.tmpdir"), "test2")
     // write the same file three times
-    FileUtil.writeWithChecksum(file, data)
-    FileUtil.writeWithChecksum(file, data)
-    FileUtil.writeWithChecksum(file, data)
+    fileUtil.writeWithChecksum(file, data)
+    fileUtil.writeWithChecksum(file, data)
+    fileUtil.writeWithChecksum(file, data)
 
     // Check that file exists
     assertTrue("File was not created!", file.exists())
@@ -81,7 +82,7 @@ class TestFileUtil {
     fos.close()
 
     // Invoke test
-    val result = FileUtil.readWithChecksum(file)
+    val result = fileUtil.readWithChecksum(file)
 
     // Check data returned
     assertEquals(data, result)
@@ -98,7 +99,7 @@ class TestFileUtil {
     fos.close()
 
     // Invoke test
-    val result = FileUtil.readWithChecksum(file)
+    val result = fileUtil.readWithChecksum(file)
 
     // Check data returned
     assertNull(result)
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index aacf848..252eb83 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -116,7 +116,8 @@ public class RocksDbOptionsHelper {
     // use prepareForBulk load only when i. the store is being requested in BulkLoad mode
     // and ii. the storeDirectory does not exist (fresh restore), because bulk load does not work seamlessly with
     // existing stores : https://github.com/facebook/rocksdb/issues/2734
-    if(storeMode.equals(StorageEngineFactory.StoreMode.BulkLoad) && !StorageManagerUtil.storeExists(storeDir)) {
+    StorageManagerUtil storageManagerUtil = new StorageManagerUtil();
+    if(storeMode.equals(StorageEngineFactory.StoreMode.BulkLoad) && !storageManagerUtil.storeExists(storeDir)) {
       log.info("Using prepareForBulkLoad for restore to " + storeDir);
       options.prepareForBulkLoad();
     }
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 4da49f4..ebf81f2 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -100,8 +100,8 @@ public class LocalStoreMonitor implements Monitor {
               LOG.info(String.format("Local store: %s is actively used by the task: %s.", storeName, task.getTaskName()));
             } else {
               LOG.info(String.format("Local store: %s not used by the task: %s.", storeName, task.getTaskName()));
-              markSweepTaskStore(StorageManagerUtil.getStorePartitionDir(jobDir, storeName, new TaskName(task.getTaskName()),
-                  TaskMode.Active));
+              markSweepTaskStore(new StorageManagerUtil().getTaskStoreDir(jobDir, storeName,
+                  new TaskName(task.getTaskName()), TaskMode.Active));
             }
           }
         }
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index afdbde7..bca168e 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -419,7 +419,7 @@ public class TestRunner {
   private void deleteDirectory(String path) {
     File dir = new File(path);
     LOG.info("Deleting the directory " + path);
-    FileUtil.rm(dir);
+    new FileUtil().rm(dir);
     if (dir.exists()) {
       LOG.warn("Could not delete the directory " + path);
     }
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 9d15138..9eaed56 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -155,7 +155,7 @@ object TestKeyValuePerformance extends Logging {
         // Run the test method
         testMethod(db, storageConfig.subset("set-" + testSet + ".", true))
 
-        FileUtil.rm(output)
+        new FileUtil().rm(output)
       })
     }
   }