You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/05/07 02:23:05 UTC
samza git commit: SAMZA-557: Reuse local state in SamzaContainer on
clean shutdown
Repository: samza
Updated Branches:
refs/heads/master 9f30ef10b -> bb8a78a85
SAMZA-557: Reuse local state in SamzaContainer on clean shutdown
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bb8a78a8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bb8a78a8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bb8a78a8
Branch: refs/heads/master
Commit: bb8a78a85666325641e75fe466476094de716555
Parents: 9f30ef1
Author: Navina Ramesh <na...@gmail.com>
Authored: Wed May 6 17:22:41 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed May 6 17:22:41 2015 -0700
----------------------------------------------------------------------
.../samza/config/ShellCommandConfig.scala | 8 +
.../apache/samza/container/SamzaContainer.scala | 36 +++-
.../samza/storage/TaskStorageManager.scala | 83 +++++++--
.../main/scala/org/apache/samza/util/Util.scala | 59 +++++-
.../samza/storage/TestTaskStorageManager.scala | 186 +++++++++++++++++++
.../scala/org/apache/samza/util/TestUtil.scala | 54 ++++--
.../samza/storage/kv/RocksDbKeyValueStore.scala | 1 -
7 files changed, 391 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index e94a473..f505322 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -46,6 +46,14 @@ object ShellCommandConfig {
*/
val ENV_JAVA_HOME = "JAVA_HOME"
+ /*
+ * The base directory for storing logged data stores used in Samza. This has to be set on all machine running Samza
+ * containers. For example, when using YARN, it has to be set in all NMs and passed to the containers.
+ * If this environment variable is not set, the path defaults to current working directory (which is the same as the
+ * path for persisting non-logged data stores)
+ */
+ val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR"
+
val COMMAND_SHELL_EXECUTE = "task.execute"
val TASK_JVM_OPTS = "task.opts"
val TASK_JAVA_HOME = "task.java.home"
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index ac4793a..e8e830e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -369,12 +369,6 @@ object SamzaContainer extends Logging {
metrics = systemProducersMetrics,
dropSerializationError = dropSerializationError)
- // TODO not sure how we should make this config based, or not. Kind of
- // strange, since it has some dynamic directories when used with YARN.
- val storeBaseDir = new File(System.getProperty("user.dir"), "state")
-
- info("Got storage engine base directory: %s" format storeBaseDir)
-
val storageEngineFactories = config
.getStoreNames
.map(storeName => {
@@ -443,6 +437,24 @@ object SamzaContainer extends Logging {
info("Got store consumers: %s" format storeConsumers)
+ // TODO not sure how we should make this config based, or not. Kind of
+ // strange, since it has some dynamic directories when used with YARN.
+ val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+ info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
+
+ var loggedStorageBaseDir: File = null
+ if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
+ val jobNameAndId = Util.getJobNameAndId(config)
+ loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
+ } else {
+ warn("No override was provided for logged store base directory. This disables local state re-use on " +
+ "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " +
+ "variable in all machines running the Samza container")
+ loggedStorageBaseDir = defaultStoreBaseDir
+ }
+
+ info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
+
val taskStores = storageEngineFactories
.map {
case (storeName, storageEngineFactory) =>
@@ -459,10 +471,15 @@ object SamzaContainer extends Logging {
case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde))
case _ => null
}
- val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+ val storeBaseDir = if(changeLogSystemStreamPartition != null) {
+ TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
+ }
+ else {
+ TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName)
+ }
val storageEngine = storageEngineFactory.getStorageEngine(
storeName,
- storePartitionDir,
+ storeBaseDir,
keySerde,
msgSerde,
collector,
@@ -481,7 +498,8 @@ object SamzaContainer extends Logging {
changeLogSystemStreams = changeLogSystemStreams,
maxChangeLogStreamPartitions,
streamMetadataCache = streamMetadataCache,
- storeBaseDir = storeBaseDir,
+ storeBaseDir = defaultStoreBaseDir,
+ loggedStoreBaseDir = loggedStorageBaseDir,
partition = taskModel.getChangelogPartition,
systemAdmins = systemAdmins)
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index f68a7fe..aeba61a 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -19,8 +19,9 @@
package org.apache.samza.storage
-import java.io.File
-import scala.collection.Map
+import java.io._
+import java.util
+import scala.collection.{JavaConversions, Map}
import org.apache.samza.util.Logging
import org.apache.samza.Partition
import org.apache.samza.system._
@@ -50,31 +51,74 @@ class TaskStorageManager(
changeLogStreamPartitions: Int,
streamMetadataCache: StreamMetadataCache,
storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
+ loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
partition: Partition,
systemAdmins: Map[String, SystemAdmin]) extends Logging {
var taskStoresToRestore = taskStores
var changeLogOldestOffsets: Map[SystemStream, String] = Map()
+ val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
+ val offsetFileName = "OFFSET"
def apply(storageEngineName: String) = taskStores(storageEngineName)
def init {
cleanBaseDirs
+ setupBaseDirs
createStreams
startConsumers
restoreStores
stopConsumers
}
+ private def setupBaseDirs {
+ debug("Setting up base directories for stores.")
+
+ val loggedStores = changeLogSystemStreams.keySet
+
+ (taskStores.keySet -- loggedStores)
+ .foreach(TaskStorageManager.getStorePartitionDir(storeBaseDir, _, taskName).mkdirs)
+
+ loggedStores.foreach(storeName => {
+ val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+ if(!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs
+ })
+ }
+
private def cleanBaseDirs {
debug("Cleaning base directories for stores.")
+
taskStores.keys.foreach(storeName => {
val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+ info("Got logged storage partition directory as %s" format storagePartitionDir.toPath.toString)
- debug("Cleaning %s for store %s." format (storagePartitionDir, storeName))
+ if(storagePartitionDir.exists()) {
+ debug("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
+ Util.rm(storagePartitionDir)
+ }
- Util.rm(storagePartitionDir)
- storagePartitionDir.mkdirs
+ val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+ info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString)
+
+ var deleteLoggedStoragePartitionDir = false
+
+ val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
+ if(offsetFileRef.exists()) {
+ debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString)
+ val offset = Util.readDataFromFile(offsetFileRef)
+ if(offset != null && !offset.isEmpty) {
+ fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+ } else {
+ deleteLoggedStoragePartitionDir = true
+ }
+ offsetFileRef.delete()
+ } else {
+ info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
+ deleteLoggedStoragePartitionDir = true
+ }
+ if(deleteLoggedStoragePartitionDir && loggedStoragePartitionDir.exists()) {
+ Util.rm(loggedStoragePartitionDir)
+ }
})
}
@@ -82,7 +126,9 @@ class TaskStorageManager(
info("Creating streams that are not present for changelog")
for ((storeName, systemStream) <- changeLogSystemStreams) {
- var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+ val systemAdmin = systemAdmins
+ .getOrElse(systemStream.getSystem,
+ throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions)
}
@@ -99,15 +145,16 @@ class TaskStorageManager(
for ((storeName, systemStream) <- changeLogSystemStreams) {
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val consumer = storeConsumers(storeName)
- val offset = changeLogOldestOffsets.getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
+ val offset =
+ Option(fileOffset.get(systemStreamPartition))
+ .getOrElse(changeLogOldestOffsets
+ .getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)))
if (offset != null) {
info("Registering change log consumer with offset %s for %s." format (offset, systemStreamPartition))
-
consumer.register(systemStreamPartition, offset)
} else {
info("Skipping change log restoration for %s because stream appears to be empty (offset was null)." format systemStreamPartition)
-
taskStoresToRestore -= storeName
}
}
@@ -123,7 +170,7 @@ class TaskStorageManager(
val systemStream = changeLogSystemStreams(storeName)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val systemConsumer = storeConsumers(storeName)
- val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition);
+ val systemConsumerIterator = new SystemStreamPartitionIterator(systemConsumer, systemStreamPartition)
store.restore(systemConsumerIterator)
}
}
@@ -143,8 +190,22 @@ class TaskStorageManager(
def stop() {
debug("Stopping stores.")
-
taskStores.values.foreach(_.stop)
+
+ debug("Persisting logged key value stores")
+ changeLogSystemStreams.foreach { case (store, systemStream) => {
+ val streamToMetadata = systemAdmins(systemStream.getSystem)
+ .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
+ val sspMetadata = streamToMetadata
+ .get(systemStream.getStream)
+ .getSystemStreamPartitionMetadata
+ .get(partition)
+ val newestOffset = sspMetadata.getNewestOffset
+
+ val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName)
+ Util.writeDataToFile(offsetFile, newestOffset)
+ info("Successfully stored offset %s for store %s in OFFSET file " format (newestOffset, store))
+ }}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 8a83566..2feb65b 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -20,8 +20,9 @@
package org.apache.samza.util
import java.net.{HttpURLConnection, URL}
-import java.io.{InputStream, BufferedReader, File, InputStreamReader}
+import java.io._
import java.lang.management.ManagementFactory
+import java.util.zip.CRC32
import org.apache.samza.{SamzaException, Partition}
import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
import java.util.Random
@@ -237,4 +238,60 @@ object Util extends Logging {
new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
}
+
+ /**
+ * Method to generate the CRC32 checksum code for any given data
+ * @param data The string for which checksum has to be generated
+ * @return long type value representing the checksum
+ * */
+ def getChecksumValue(data: String) = {
+ val crc = new CRC32
+ crc.update(data.getBytes)
+ crc.getValue
+ }
+
+ /**
+ * Method that always writes checksum & data to a file
+ * Checksum is pre-fixed to the data and is a 32-bit long type data.
+ * @param file The file handle to write to
+ * @param data The data to be written to the file
+ * */
+ def writeDataToFile(file: File, data: String) = {
+ val checksum = getChecksumValue(data)
+ var oos: ObjectOutputStream = null
+ var fos: FileOutputStream = null
+ try {
+ fos = new FileOutputStream(file)
+ oos = new ObjectOutputStream(fos)
+ oos.writeLong(checksum)
+ oos.writeUTF(data)
+ } finally {
+ oos.close()
+ fos.close()
+ }
+ }
+
+ /**
+ * Method to read from a file that has a checksum prepended to the data
+ * @param file The file handle to read from
+ * */
+ def readDataFromFile(file: File) = {
+ var fis: FileInputStream = null
+ var ois: ObjectInputStream = null
+ try {
+ fis = new FileInputStream(file)
+ ois = new ObjectInputStream(fis)
+ val checksumFromFile = ois.readLong()
+ val data = ois.readUTF()
+ if(checksumFromFile == getChecksumValue(data)) {
+ data
+ } else {
+ info("Checksum match failed. Data in file is corrupted. Skipping content.")
+ null
+ }
+ } finally {
+ ois.close()
+ fis.close()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
new file mode 100644
index 0000000..6491d09
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage
+
+
+import java.io.File
+import org.junit.{After, Before, Test}
+import org.junit.Assert._
+import org.scalatest.mock.MockitoSugar
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import scala.collection.JavaConversions
+
+import org.apache.samza.container.TaskName
+import org.apache.samza.util.Util
+import org.apache.samza.system._
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.Partition
+
+class TestTaskStorageManager extends MockitoSugar {
+
+ val store = "store1"
+ val loggedStore = "loggedStore1"
+ val taskName = new TaskName("testTask")
+
+ @Before
+ def setupTestDirs() {
+ TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store , taskName)
+ .mkdirs()
+ TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+ .mkdirs()
+ }
+
+ @After
+ def tearDownTestDirs() {
+ Util.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
+ Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
+ }
+
+ @Test
+ def testCleanBaseDirs() {
+ val checkFilePath1 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName), "check")
+ checkFilePath1.createNewFile()
+ val checkFilePath2 = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "check")
+ checkFilePath2.createNewFile()
+
+ val taskStorageManager = new TaskStorageManagerBuilder()
+ .addStore(store)
+ .addStore(loggedStore)
+ .build
+
+ //Invoke test method
+ val cleanDirMethod = taskStorageManager
+ .getClass
+ .getDeclaredMethod("cleanBaseDirs",
+ new Array[java.lang.Class[_]](0):_*)
+ cleanDirMethod.setAccessible(true)
+ cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+ assertTrue("check file was found in store partition directory. Clean up failed!", !checkFilePath1.exists())
+ assertTrue("check file was found in logged store partition directory. Clean up failed!", !checkFilePath2.exists())
+ }
+
+ @Test
+ def testCleanBaseDirsWithOffsetFileForLoggedStore() {
+ val checkFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+ Util.writeDataToFile(checkFilePath, "100")
+
+ val taskStorageManager = new TaskStorageManagerBuilder()
+ .addStore(loggedStore)
+ .build
+
+ val cleanDirMethod = taskStorageManager.getClass.getDeclaredMethod("cleanBaseDirs",
+ new Array[java.lang.Class[_]](0):_*)
+ cleanDirMethod.setAccessible(true)
+ cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+ assertTrue("Offset file was not removed. Clean up failed!", !checkFilePath.exists())
+ assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
+ }
+
+ @Test
+ def testStopCreatesOffsetFileForLoggedStore() {
+ val partition = new Partition(0)
+
+ val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+
+ val mockSystemAdmin = mock[SystemAdmin]
+ val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")))))
+ val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
+ when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+
+ //Build TaskStorageManager
+ val taskStorageManager = new TaskStorageManagerBuilder()
+ .addStore(loggedStore)
+ .setSystemAdmin("kafka", mockSystemAdmin)
+ .setPartition(partition)
+ .build
+
+ //Invoke test method
+ val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new Array[java.lang.Class[_]](0):_*)
+ stopMethod.setAccessible(true)
+ stopMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+ //Check conditions
+ assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
+ assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+ }
+}
+
+object TaskStorageManagerBuilder {
+ val defaultStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "store")
+ val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore")
+}
+
+class TaskStorageManagerBuilder extends MockitoSugar {
+ var taskStores: Map[String, StorageEngine] = Map()
+ var storeConsumers: Map[String, SystemConsumer] = Map()
+ var changeLogSystemStreams: Map[String, SystemStream] = Map()
+ val streamMetadataCache = mock[StreamMetadataCache]
+ var partition: Partition = new Partition(0)
+ var systemAdmins: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin])
+ var taskName: TaskName = new TaskName("testTask")
+ var storeBaseDir: File = TaskStorageManagerBuilder.defaultStoreBaseDir
+ var loggedStoreBaseDir: File = TaskStorageManagerBuilder.defaultLoggedStoreBaseDir
+ var changeLogStreamPartitions: Int = 1
+
+ def addStore(storeName: String): TaskStorageManagerBuilder = {
+ taskStores = taskStores ++ Map(storeName -> mock[StorageEngine])
+ storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer])
+ changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream"))
+ this
+ }
+
+ def setPartition(p: Partition) = {
+ partition = p
+ this
+ }
+
+ def setChangeLogSystemStreams(storeName: String, systemStream: SystemStream) = {
+ changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> systemStream)
+ this
+ }
+
+ def setSystemAdmin(system: String, systemAdmin: SystemAdmin) = {
+ systemAdmins = systemAdmins ++ Map(system -> systemAdmin)
+ this
+ }
+
+ def setTaskName(tn: TaskName) = {
+ taskName = tn
+ this
+ }
+
+ def build: TaskStorageManager = {
+ new TaskStorageManager(
+ taskName = taskName,
+ taskStores = taskStores,
+ storeConsumers = storeConsumers,
+ changeLogSystemStreams = changeLogSystemStreams,
+ changeLogStreamPartitions = changeLogStreamPartitions,
+ streamMetadataCache = streamMetadataCache,
+ storeBaseDir = storeBaseDir,
+ loggedStoreBaseDir = loggedStoreBaseDir,
+ partition = partition,
+ systemAdmins = systemAdmins
+ )
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index b75f440..ead6f94 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -19,22 +19,48 @@
package org.apache.samza.util
-import org.apache.samza.Partition
-import org.apache.samza.config.Config
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.Util._
+import java.io._
import org.junit.Assert._
import org.junit.Test
-import scala.collection.JavaConversions._
-import scala.util.Random
-
class TestUtil {
+
+ val data = "100"
+ val checksum = Util.getChecksumValue(data)
+ val file = new File(System.getProperty("java.io.tmpdir"), "test")
+
+ @Test
+ def testWriteDataToFile() {
+ // Invoke test
+ Util.writeDataToFile(file, data)
+
+ // Check that file exists
+ assertTrue("File was not created!", file.exists())
+ val fis = new FileInputStream(file)
+ val ois = new ObjectInputStream(fis)
+
+ // Check content of the file is as expected
+ assertEquals(checksum, ois.readLong())
+ assertEquals(data, ois.readUTF())
+ ois.close()
+ fis.close()
+ }
+
+ @Test
+ def testReadDataFromFile() {
+ // Setup
+ val fos = new FileOutputStream(file)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeLong(checksum)
+ oos.writeUTF(data)
+ oos.close()
+ fos.close()
+
+ // Invoke test
+ val result = Util.readDataFromFile(file)
+
+ // Check data returned
+ assertEquals(data, result)
+
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/bb8a78a8/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 1b44a51..dd20f17 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -68,7 +68,6 @@ object RocksDbKeyValueStore extends Logging {
options.setMaxWriteBufferNumber(storeConfig.get("rocksdb.num.write.buffers", "3").toInt)
options.setCreateIfMissing(true)
- options.setErrorIfExists(true)
options
}