You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:50 UTC
[18/36] samza git commit: SAMZA-543; disable WAL for rocksdb kv store
SAMZA-543; disable WAL for rocksdb kv store
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6ada1f6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6ada1f6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6ada1f6
Branch: refs/heads/samza-sql
Commit: a6ada1f6fa4fa401201ad863eda3264c1782e276
Parents: 137d0d6
Author: Navina Ramesh <na...@gmail.com>
Authored: Wed Mar 18 18:47:08 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Mar 18 18:47:08 2015 -0700
----------------------------------------------------------------------
.../RocksDbKeyValueStorageEngineFactory.scala | 4 +-
.../samza/storage/kv/RocksDbKeyValueStore.scala | 11 +-
.../src/main/config/perf/kv-perf.properties | 28 ++-
.../performance/TestKeyValuePerformance.scala | 175 ++++++++++++-------
4 files changed, 142 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index 14eeba5..5ab6859 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -24,6 +24,7 @@ import org.apache.samza.container.SamzaContainerContext
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.storage.kv._
import org.apache.samza.system.SystemStreamPartition
+import org.rocksdb.WriteOptions
class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
{
@@ -44,7 +45,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext)
- val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbMetrics)
+ val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
+ val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbWriteOptions, rocksDbMetrics)
rocksDb
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index eae2a5a..66c2a0d 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -29,7 +29,7 @@ object RocksDbKeyValueStore extends Logging {
def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
- val options = new Options();
+ val options = new Options()
// Cache size and write buffer size are specified on a per-container basis.
val numTasks = containerContext.taskNames.size
@@ -77,6 +77,7 @@ object RocksDbKeyValueStore extends Logging {
class RocksDbKeyValueStore(
val dir: File,
val options: Options,
+ val writeOptions: WriteOptions = new WriteOptions(),
val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
private lazy val db = RocksDB.open(options, dir.toString)
@@ -97,11 +98,11 @@ class RocksDbKeyValueStore(
metrics.puts.inc
require(key != null, "Null key not allowed.")
if (value == null) {
- db.remove(key)
+ db.remove(writeOptions, key)
deletesSinceLastCompaction += 1
} else {
metrics.bytesWritten.inc(key.size + value.size)
- db.put(key, value)
+ db.put(writeOptions, key, value)
}
}
@@ -115,12 +116,12 @@ class RocksDbKeyValueStore(
val curr = iter.next()
if (curr.getValue == null) {
deletes += 1
- db.remove(curr.getKey);
+ db.remove(writeOptions, curr.getKey)
} else {
val key = curr.getKey
val value = curr.getValue
metrics.bytesWritten.inc(key.size + value.size)
- db.put(key, value)
+ db.put(writeOptions, key, value)
}
}
metrics.puts.inc(wrote)
http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-test/src/main/config/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties
index 0d487b1..33fcd8d 100644
--- a/samza-test/src/main/config/perf/kv-perf.properties
+++ b/samza-test/src/main/config/perf/kv-perf.properties
@@ -15,7 +15,27 @@
# specific language governing permissions and limitations
# under the License.
-stores.test.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
-stores.test.compaction.delete.threshold=1000
-test.partition.count=4
-test.num.loops=1000
+# Config for all-with-deletes
+test.all-with-deletes.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+
+test.all-with-deletes.partition.count=4
+
+test.all-with-deletes.set.count=1
+test.all-with-deletes.set-1.num.loops=1000
+
+# Config for rocksdb-write-performance
+test.rocksdb-write-performance.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+test.rocksdb-write-performance.partition.count=4
+
+test.rocksdb-write-performance.set.count=3
+test.rocksdb-write-performance.set-1.message.size=256
+test.rocksdb-write-performance.set-1.message.count=1000000
+
+test.rocksdb-write-performance.set-2.message.size=512
+test.rocksdb-write-performance.set-2.message.count=1000000
+
+test.rocksdb-write-performance.set-3.message.size=1024
+test.rocksdb-write-performance.set-3.message.count=1000000
+
+# List of tests to execute
+test.methods=rocksdb-write-performance
http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index b4fa7d3..0858b98 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -22,114 +22,134 @@ package org.apache.samza.test.performance
import org.apache.samza.util.Logging
import org.apache.samza.config.Config
import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.{ TaskName, SamzaContainerContext }
+import org.apache.samza.container.{TaskName, SamzaContainerContext}
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.kv.KeyValueStorageEngine
import org.apache.samza.storage.StorageEngineFactory
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Util
-import org.apache.samza.serializers.ByteSerde
+import org.apache.samza.serializers.{StringSerde, ByteSerde, SerdeManager}
import org.apache.samza.Partition
import org.apache.samza.SamzaException
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemProducer
-import org.apache.samza.serializers.SerdeManager
import java.io.File
import java.util.UUID
+import java.util
/**
* A simple CLI-based tool for running various key-value performance tests.
+ *
+ * List of KeyValuePerformance tests must be defined in 'test.methods' configuration as a comma-separated value.
+ * The tool splits this list to determine which tests to run.
+ *
+ * Each test should define its own set of configuration for partition count, stores etc.
+ * using the "test.<test-name>.<config-string>=<config-value>" pattern
+ *
+ * Each test may define one or more test parameterss.
+ * For example, test1 can define 2 sets of parameters by specifying "test.test1.set.count=2" and
+ * define each set as:
+ * "test.test1.set-1.<param-name>=<param-value>"
+ * "test.test1.set-2.<param-name>=<param-value>"
*/
+
object TestKeyValuePerformance extends Logging {
val Encoding = "UTF-8"
- /**
- * KeyValuePerformance job configs must define a 'test.method' configuration.
- * This configuration must be the value of one of the keys in this map. The
- * test uses this key to determine which test to run.
- */
- val testMethods: Map[String, (Config) => Unit] = Map("testAllWithDeletes" -> runTestAllWithDeletes)
+ val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map(
+ "all-with-deletes" -> runTestAllWithDeletes,
+ "rocksdb-write-performance" -> runTestMsgWritePerformance
+ )
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
- val testMethod = config.get("test.method", "testAllWithDeletes")
-
- info("Got arguments: %s" format args.toList)
- info("Using config: %s" format config)
- info("Using test method: %s" format testMethod)
-
- if (testMethods.contains(testMethod)) {
- testMethods(testMethod)(config)
- } else {
- error("Invalid test method. Valid methods are: %s" format testMethods.keys)
+ val tests = config.get("test.methods", "rocksdb-write-performance,all-with-deletes").split(",")
- throw new SamzaException("Unknown test method: %s" format testMethod)
+ tests.foreach{ test =>
+ info("Running test: %s" format test)
+ if(testMethods.contains(test)) {
+ invokeTest(test, testMethods(test), config.subset("test." + test + ".", true))
+ } else {
+ error("Invalid test method. valid methods are: %s" format testMethods.keys)
+ throw new SamzaException("Unknown test method: %s" format test)
+ }
}
}
- /**
- * Do wiring for testAllWithDeletes, and run the test.
- */
- def runTestAllWithDeletes(config: Config) {
- val test = new TestKeyValuePerformance
- val serde = new ByteSerde
- val partitionCount = config.getInt("test.partition.count", 1)
- val numLoops = config.getInt("test.num.loops", 100)
- val messagesPerBatch = config.getInt("test.messages.per.batch", 10000)
- val messageSizeBytes = config.getInt("test.message.size.bytes", 200)
- val taskNames = new java.util.ArrayList[TaskName]()
-
+ def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) {
+ val taskNames = new util.ArrayList[TaskName]()
+ val partitionCount = config.getInt("partition.count", 1)
(0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString)))
- info("Using partition count: %s" format partitionCount)
- info("Using num loops: %s" format numLoops)
- info("Using messages per batch: %s" format messagesPerBatch)
- info("Using message size: %s bytes" format messageSizeBytes)
-
+ val producerMultiplexer = new SystemProducers(
+ Map[String, SystemProducer](),
+ new SerdeManager
+ )
// Build a Map[String, StorageEngineFactory]. The key is the store name.
- val storageEngineFactories = config
+ val storageEngineMappings = config
.getStoreNames
.map(storeName => {
- val storageFactoryClassName = config
- .getStorageFactoryClassName(storeName)
- .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
+ val storageFactoryClassName =
+ config.getStorageFactoryClassName(storeName)
+ .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
(storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName))
- }).toMap
+ })
- val producerMultiplexer = new SystemProducers(
- Map[String, SystemProducer](),
- new SerdeManager)
+ for((storeName, storageEngine) <- storageEngineMappings) {
+ val testSetCount = config.getInt("set.count", 1)
+ (1 to testSetCount).foreach(testSet => {
+ //Create a new DB instance for each test set
+ val output = new File("/tmp/" + UUID.randomUUID())
+ val byteSerde = new ByteSerde
+ info("Using output directory %s for %s using %s." format (output, storeName, storageEngine.getClass.getCanonicalName))
+ val engine = storageEngine.getStorageEngine(
+ storeName,
+ output,
+ byteSerde,
+ byteSerde,
+ new TaskInstanceCollector(producerMultiplexer),
+ new MetricsRegistryMap,
+ null,
+ new SamzaContainerContext(0, config, taskNames)
+ )
+
+ val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {
+ throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
+ } else {
+ engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
+ }
+
+ // Run the test method
+ testMethod(db, config.subset("set-" + testSet + ".", true))
+
+ info("Cleaning up output directory for %s." format storeName)
+ Util.rm(output)
+ })
+ }
+ }
- for ((storeName, storageEngine) <- storageEngineFactories) {
- val output = new File("/tmp/" + UUID.randomUUID)
+ def runTestAllWithDeletes(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
+ val numLoops = config.getInt("num.loops", 100)
+ val messagesPerBatch = config.getInt("messages.per.batch", 10000)
+ val messageSizeBytes = config.getInt("message.size.bytes", 200)
- info("Using output directory %s for %s." format (output, storeName))
+ info("Using (num loops, messages per batch, message size in bytes) => (%s, %s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes))
+ new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
- val engine = storageEngine.getStorageEngine(
- storeName,
- output,
- serde,
- serde,
- new TaskInstanceCollector(producerMultiplexer),
- new MetricsRegistryMap,
- null,
- new SamzaContainerContext(0, config, taskNames))
+ }
- val db = if (!engine.isInstanceOf[KeyValueStorageEngine[_, _]]) {
- throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
- } else {
- engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
- }
+ def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
+ val messageSizeBytes = config.getInt("message.size", 200)
+ val messageCount = config.getInt("message.count", 10000)
- test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
- info("Cleaning up output directory for %s." format storeName)
- Util.rm(output)
- }
+ info("Using (message count, message size in bytes) => (%s, %s)" format (messageCount, messageSizeBytes))
+ new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes)
}
+
}
class TestKeyValuePerformance extends Logging {
@@ -179,4 +199,27 @@ class TestKeyValuePerformance extends Logging {
info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
}
+
+
+ /**
+ * Test that successively writes a set of fixed-size messages to the KV store
+ * and computes the total time for the operations
+ * @param store Key-Value store instance that is being tested
+ * @param numMsgs Total number of messages to write to the store
+ * @param msgSizeInBytes Size of each message in Bytes
+ */
+ def testMsgWritePerformance(
+ store: KeyValueStore[Array[Byte], Array[Byte]],
+ numMsgs: Int = 10000,
+ msgSizeInBytes: Int = 200) {
+
+ val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding)
+
+ val start = System.currentTimeMillis
+ (0 until numMsgs).foreach(i => {
+ store.put(i.toString.getBytes(Encoding), msg)
+ })
+ val timeTaken = System.currentTimeMillis - start
+ info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001))
+ }
}
\ No newline at end of file