You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/05/07 02:23:05 UTC

samza git commit: SAMZA-557: Reuse local state in SamzaContainer on clean shutdown

Repository: samza
Updated Branches:
  refs/heads/master 9f30ef10b -> bb8a78a85


SAMZA-557: Reuse local state in SamzaContainer on clean shutdown


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bb8a78a8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bb8a78a8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bb8a78a8

Branch: refs/heads/master
Commit: bb8a78a85666325641e75fe466476094de716555
Parents: 9f30ef1
Author: Navina Ramesh <na...@gmail.com>
Authored: Wed May 6 17:22:41 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed May 6 17:22:41 2015 -0700

----------------------------------------------------------------------
 .../samza/config/ShellCommandConfig.scala       |   8 +
 .../apache/samza/container/SamzaContainer.scala |  36 +++-
 .../samza/storage/TaskStorageManager.scala      |  83 +++++++--
 .../main/scala/org/apache/samza/util/Util.scala |  59 +++++-
 .../samza/storage/TestTaskStorageManager.scala  | 186 +++++++++++++++++++
 .../scala/org/apache/samza/util/TestUtil.scala  |  54 ++++--
 .../samza/storage/kv/RocksDbKeyValueStore.scala |   1 -
 7 files changed, 391 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index e94a473..f505322 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -46,6 +46,14 @@ object ShellCommandConfig {
    */
   val ENV_JAVA_HOME = "JAVA_HOME"
 
+  /*
+   * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
+   * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
+   * If this environment variable is not set, the path defaults to current working directory (which is the same as the
+   * path for persisting non-logged data stores)
+   */
+  val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR"
+
   val COMMAND_SHELL_EXECUTE = "task.execute"
   val TASK_JVM_OPTS = "task.opts"
   val TASK_JAVA_HOME = "task.java.home"

http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ac4793a..e8e830e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -369,12 +369,6 @@ object SamzaContainer extends Logging {
       metrics = systemProducersMetrics,
       dropSerializationError = dropSerializationError)
 
-    // TODO not sure how we should make this config based, or not. Kind of
-    // strange, since it has some dynamic directories when used with YARN.
-    val storeBaseDir = new File(System.getProperty("user.dir"), "state")
-
-    info("Got storage engine base directory: %s" format storeBaseDir)
-
     val storageEngineFactories = config
       .getStoreNames
       .map(storeName => {
@@ -443,6 +437,24 @@ object SamzaContainer extends Logging {
 
       info("Got store consumers: %s" format storeConsumers)
 
+      // TODO not sure how we should make this config based, or not. Kind of
+      // strange, since it has some dynamic directories when used with YARN.
+      val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+      info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
+
+      var loggedStorageBaseDir: File = null
+      if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
+        val jobNameAndId = Util.getJobNameAndId(config)
+        loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
+      } else {
+        warn("No override was provided for logged store base directory. This disables local state re-use on " +
+          "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " +
+          "variable in all machines running the Samza container")
+        loggedStorageBaseDir = defaultStoreBaseDir
+      }
+
+      info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
+
       val taskStores = storageEngineFactories
         .map {
           case (storeName, storageEngineFactory) =>
@@ -459,10 +471,15 @@ object SamzaContainer extends Logging {
               case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde))
               case _ => null
             }
-            val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+            val storeBaseDir = if(changeLogSystemStreamPartition != null) {
+              TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
+            }
+            else {
+              TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName)
+            }
             val storageEngine = storageEngineFactory.getStorageEngine(
               storeName,
-              storePartitionDir,
+              storeBaseDir,
               keySerde,
               msgSerde,
               collector,
@@ -481,7 +498,8 @@ object SamzaContainer extends Logging {
         changeLogSystemStreams = changeLogSystemStreams,
         maxChangeLogStreamPartitions,
         streamMetadataCache = streamMetadataCache,
-        storeBaseDir = storeBaseDir,
+        storeBaseDir = defaultStoreBaseDir,
+        loggedStoreBaseDir = loggedStorageBaseDir,
         partition = taskModel.getChangelogPartition,
         systemAdmins = systemAdmins)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index f68a7fe..aeba61a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -19,8 +19,9 @@
 
 package org.apache.samza.storage
 
-import java.io.File
-import scala.collection.Map
+import java.io._
+import java.util
+import scala.collection.{JavaConversions, Map}
 import org.apache.samza.util.Logging
 import org.apache.samza.Partition
 import org.apache.samza.system._
@@ -50,31 +51,74 @@ class TaskStorageManager(
   changeLogStreamPartitions: Int,
   streamMetadataCache: StreamMetadataCache,
   storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
+  loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   partition: Partition,
   systemAdmins: Map[String, SystemAdmin]) extends Logging {
 
   var taskStoresToRestore = taskStores
   var changeLogOldestOffsets: Map[SystemStream, String] = Map()
+  val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
+  val offsetFileName = "OFFSET"
 
   def apply(storageEngineName: String) = taskStores(storageEngineName)
 
   def init {
     cleanBaseDirs
+    setupBaseDirs
     createStreams
     startConsumers
     restoreStores
     stopConsumers
   }
 
+  private def setupBaseDirs {
+    debug("Setting up base directories for stores.")
+
+    val loggedStores = changeLogSystemStreams.keySet
+
+    (taskStores.keySet -- loggedStores)
+      .foreach(TaskStorageManager.getStorePartitionDir(storeBaseDir, _, taskName).mkdirs)
+
+    loggedStores.foreach(storeName => {
+      val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+      if(!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs
+    })
+  }
+
   private def cleanBaseDirs {
     debug("Cleaning base directories for stores.")
+
     taskStores.keys.foreach(storeName => {
       val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+      info("Got logged storage partition directory as %s" format storagePartitionDir.toPath.toString)
 
-      debug("Cleaning %s for store %s." format (storagePartitionDir, storeName))
+      if(storagePartitionDir.exists()) {
+        debug("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
+        Util.rm(storagePartitionDir)
+      }
 
-      Util.rm(storagePartitionDir)
-      storagePartitionDir.mkdirs
+      val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+      info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString)
+
+      var deleteLoggedStoragePartitionDir = false
+
+      val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
+      if(offsetFileRef.exists()) {
+        debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString)
+        val offset = Util.readDataFromFile(offsetFileRef)
+        if(offset != null && !offset.isEmpty) {
+          fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+        } else {
+          deleteLoggedStoragePartitionDir = true
+        }
+        offsetFileRef.delete()
+      } else {
+        info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
+        deleteLoggedStoragePartitionDir = true
+      }
+      if(deleteLoggedStoragePartitionDir && loggedStoragePartitionDir.exists()) {
+        Util.rm(loggedStoragePartitionDir)
+      }
     })
   }
 
@@ -82,7 +126,9 @@ class TaskStorageManager(
     info("Creating streams that are not present for changelog")
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
-      var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+      val systemAdmin = systemAdmins
+        .getOrElse(systemStream.getSystem,
+                   throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
       systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions)
     }
 
@@ -99,15 +145,16 @@ class TaskStorageManager(
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
       val consumer = storeConsumers(storeName)
-      val offset = changeLogOldestOffsets.getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
+      val offset =
+        Option(fileOffset.get(systemStreamPartition))
+          .getOrElse(changeLogOldestOffsets
+            .getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)))
 
       if (offset != null) {
         info("Registering change log consumer with offset %s for %s." format (offset, systemStreamPartition))
-
         consumer.register(systemStreamPartition, offset)
       } else {
         info("Skipping change log restoration for %s because stream appears to be empty (offset was null)." format systemStreamPartition)
-
         taskStoresToRestore -= storeName
       }
     }
@@ -123,7 +170,7 @@ class TaskStorageManager(
         val systemStream = changeLogSystemStreams(storeName)
         val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
         val systemConsumer = storeConsumers(storeName)
-        val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition);
+        val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition)
         store.restore(systemConsumerIterator)
       }
     }
@@ -143,8 +190,22 @@ class TaskStorageManager(
 
   def stop() {
     debug("Stopping stores.")
-
     taskStores.values.foreach(_.stop)
+
+    debug("Persisting logged key value stores")
+    changeLogSystemStreams.foreach { case (store, systemStream) => {
+      val streamToMetadata = systemAdmins(systemStream.getSystem)
+                              .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
+      val sspMetadata = streamToMetadata
+                          .get(systemStream.getStream)
+                          .getSystemStreamPartitionMetadata
+                          .get(partition)
+      val newestOffset = sspMetadata.getNewestOffset
+
+      val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName)
+      Util.writeDataToFile(offsetFile, newestOffset)
+      info("Successfully stored offset %s for store %s in OFFSET file " format (newestOffset, store))
+    }}
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 8a83566..2feb65b 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -20,8 +20,9 @@
 package org.apache.samza.util
 
 import java.net.{HttpURLConnection, URL}
-import java.io.{InputStream, BufferedReader, File, InputStreamReader}
+import java.io._
 import java.lang.management.ManagementFactory
+import java.util.zip.CRC32
 import org.apache.samza.{SamzaException, Partition}
 import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
 import java.util.Random
@@ -237,4 +238,60 @@ object Util extends Logging {
      new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
                                new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
   }
+
+  /**
+   * Method to generate the CRC32 checksum code for any given data
+   * @param data The string for which checksum has to be generated
+   * @return long type value representing the checksum
+   * */
+  def getChecksumValue(data: String) = {
+    val crc = new CRC32
+    crc.update(data.getBytes)
+    crc.getValue
+  }
+
+  /**
+   * Method that always writes checksum & data to a file
+   * Checksum is pre-fixed to the data and is a 32-bit long type data.
+   * @param file The file handle to write to
+   * @param data The data to be written to the file
+   * */
+  def writeDataToFile(file: File, data: String) = {
+    val checksum = getChecksumValue(data)
+    var oos: ObjectOutputStream = null
+    var fos: FileOutputStream = null
+    try {
+      fos = new FileOutputStream(file)
+      oos = new ObjectOutputStream(fos)
+      oos.writeLong(checksum)
+      oos.writeUTF(data)
+    } finally {
+      oos.close()
+      fos.close()
+    }
+  }
+
+  /**
+   * Method to read from a file that has a checksum prepended to the data
+   * @param file The file handle to read from
+   * */
+  def readDataFromFile(file: File) = {
+    var fis: FileInputStream = null
+    var ois: ObjectInputStream = null
+    try {
+      fis = new FileInputStream(file)
+      ois = new ObjectInputStream(fis)
+      val checksumFromFile = ois.readLong()
+      val data = ois.readUTF()
+      if(checksumFromFile == getChecksumValue(data)) {
+        data
+      } else {
+        info("Checksum match failed. Data in file is corrupted. Skipping content.")
+        null
+      }
+    } finally {
+      ois.close()
+      fis.close()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
new file mode 100644
index 0000000..6491d09
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+
+import java.io.File
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import scala.collection.JavaConversions
+
+import org.apache.samza.container.TaskName
+import org.apache.samza.util.Util
+import org.apache.samza.system._
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.Partition
+
+class TestTaskStorageManager extends MockitoSugar {
+
+  val store = "store1"
+  val loggedStore = "loggedStore1"
+  val taskName = new TaskName("testTask")
+
+  @Before
+  def setupTestDirs() {
+    TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store , taskName)
+                      .mkdirs()
+    TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+                      .mkdirs()
+  }
+
+  @After
+  def tearDownTestDirs() {
+    Util.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
+    Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
+  }
+
+  @Test
+  def testCleanBaseDirs() {
+    val checkFilePath1 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName), "check")
+    checkFilePath1.createNewFile()
+    val checkFilePath2 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "check")
+    checkFilePath2.createNewFile()
+
+    val taskStorageManager = new TaskStorageManagerBuilder()
+      .addStore(store)
+      .addStore(loggedStore)
+      .build
+
+    //Invoke test method
+    val cleanDirMethod = taskStorageManager
+                          .getClass
+                          .getDeclaredMethod("cleanBaseDirs",
+                                             new Array[java.lang.Class[_]](0):_*)
+    cleanDirMethod.setAccessible(true)
+    cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+    assertTrue("check file was found in store partition directory. Clean up failed!", !checkFilePath1.exists())
+    assertTrue("check file was found in logged store partition directory. Clean up failed!", !checkFilePath2.exists())
+  }
+
+  @Test
+  def testCleanBaseDirsWithOffsetFileForLoggedStore() {
+    val checkFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+    Util.writeDataToFile(checkFilePath, "100")
+
+    val taskStorageManager = new TaskStorageManagerBuilder()
+      .addStore(loggedStore)
+      .build
+
+    val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
+      new Array[java.lang.Class[_]](0):_*)
+    cleanDirMethod.setAccessible(true)
+    cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+    assertTrue("Offset file was not removed. Clean up failed!", !checkFilePath.exists())
+    assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
+  }
+
+  @Test
+  def testStopCreatesOffsetFileForLoggedStore() {
+    val partition = new Partition(0)
+
+    val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+
+    val mockSystemAdmin = mock[SystemAdmin]
+    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")))))
+    val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
+    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+
+    //Build TaskStorageManager
+    val taskStorageManager = new TaskStorageManagerBuilder()
+      .addStore(loggedStore)
+      .setSystemAdmin("kafka", mockSystemAdmin)
+      .setPartition(partition)
+      .build
+
+    //Invoke test method
+    val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new Array[java.lang.Class[_]](0):_*)
+    stopMethod.setAccessible(true)
+    stopMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+    //Check conditions
+    assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
+    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+  }
+}
+
+object TaskStorageManagerBuilder {
+  val defaultStoreBaseDir =  new File(System.getProperty("java.io.tmpdir") + File.separator + "store")
+  val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore")
+}
+
+class TaskStorageManagerBuilder extends MockitoSugar {
+  var taskStores: Map[String, StorageEngine] = Map()
+  var storeConsumers: Map[String, SystemConsumer] = Map()
+  var changeLogSystemStreams: Map[String, SystemStream] = Map()
+  val streamMetadataCache = mock[StreamMetadataCache]
+  var partition: Partition = new Partition(0)
+  var systemAdmins: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin])
+  var taskName: TaskName = new TaskName("testTask")
+  var storeBaseDir: File = TaskStorageManagerBuilder.defaultStoreBaseDir
+  var loggedStoreBaseDir: File =  TaskStorageManagerBuilder.defaultLoggedStoreBaseDir
+  var changeLogStreamPartitions: Int = 1
+
+  def addStore(storeName: String): TaskStorageManagerBuilder =  {
+    taskStores = taskStores ++ Map(storeName -> mock[StorageEngine])
+    storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer])
+    changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream"))
+    this
+  }
+
+  def setPartition(p: Partition) = {
+    partition = p
+    this
+  }
+
+  def setChangeLogSystemStreams(storeName: String, systemStream: SystemStream) = {
+    changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> systemStream)
+    this
+  }
+
+  def setSystemAdmin(system: String, systemAdmin: SystemAdmin) = {
+    systemAdmins = systemAdmins ++ Map(system -> systemAdmin)
+    this
+  }
+
+  def setTaskName(tn: TaskName) = {
+    taskName = tn
+    this
+  }
+
+  def build: TaskStorageManager = {
+    new TaskStorageManager(
+      taskName = taskName,
+      taskStores = taskStores,
+      storeConsumers = storeConsumers,
+      changeLogSystemStreams = changeLogSystemStreams,
+      changeLogStreamPartitions = changeLogStreamPartitions,
+      streamMetadataCache = streamMetadataCache,
+      storeBaseDir = storeBaseDir,
+      loggedStoreBaseDir = loggedStoreBaseDir,
+      partition = partition,
+      systemAdmins = systemAdmins
+    )
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index b75f440..ead6f94 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -19,22 +19,48 @@
 
 package org.apache.samza.util
 
-import org.apache.samza.Partition
-import org.apache.samza.config.Config
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.Util._
+import java.io._
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
-import scala.util.Random
-
 class TestUtil {
+
+  val data = "100"
+  val checksum = Util.getChecksumValue(data)
+  val file = new File(System.getProperty("java.io.tmpdir"), "test")
+
+  @Test
+  def testWriteDataToFile() {
+    // Invoke test
+    Util.writeDataToFile(file, data)
+
+    // Check that file exists
+    assertTrue("File was not created!", file.exists())
+    val fis = new FileInputStream(file)
+    val ois = new ObjectInputStream(fis)
+
+    // Check content of the file is as expected
+    assertEquals(checksum, ois.readLong())
+    assertEquals(data, ois.readUTF())
+    ois.close()
+    fis.close()
+  }
+
+  @Test
+  def testReadDataFromFile() {
+    // Setup
+    val fos = new FileOutputStream(file)
+    val oos = new ObjectOutputStream(fos)
+    oos.writeLong(checksum)
+    oos.writeUTF(data)
+    oos.close()
+    fos.close()
+
+    // Invoke test
+    val result = Util.readDataFromFile(file)
+
+    // Check data returned
+    assertEquals(data, result)
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 1b44a51..dd20f17 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -68,7 +68,6 @@ object RocksDbKeyValueStore extends Logging {
 
     options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt)
     options.setCreateIfMissing(true)
-    options.setErrorIfExists(true)
     options
   }