You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:50 UTC

[18/36] samza git commit: SAMZA-543; disable WAL for rocksdb kv store

SAMZA-543; disable WAL for rocksdb kv store


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a6ada1f6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a6ada1f6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a6ada1f6

Branch: refs/heads/samza-sql
Commit: a6ada1f6fa4fa401201ad863eda3264c1782e276
Parents: 137d0d6
Author: Navina Ramesh <na...@gmail.com>
Authored: Wed Mar 18 18:47:08 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Mar 18 18:47:08 2015 -0700

----------------------------------------------------------------------
 .../RocksDbKeyValueStorageEngineFactory.scala   |   4 +-
 .../samza/storage/kv/RocksDbKeyValueStore.scala |  11 +-
 .../src/main/config/perf/kv-perf.properties     |  28 ++-
 .../performance/TestKeyValuePerformance.scala   | 175 ++++++++++++-------
 4 files changed, 142 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index 14eeba5..5ab6859 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -24,6 +24,7 @@ import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.storage.kv._
 import org.apache.samza.system.SystemStreamPartition
+import org.rocksdb.WriteOptions
 
 class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngineFactory[K, V]
 {
@@ -44,7 +45,8 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
     val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
     val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
     val rocksDbOptions = RocksDbKeyValueStore.options(storageConfig, containerContext)
-    val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbMetrics)
+    val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
+    val rocksDb = new RocksDbKeyValueStore(storeDir, rocksDbOptions, rocksDbWriteOptions, rocksDbMetrics)
     rocksDb
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index eae2a5a..66c2a0d 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -29,7 +29,7 @@ object RocksDbKeyValueStore extends Logging {
   def options(storeConfig: Config, containerContext: SamzaContainerContext) = {
     val cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L)
     val writeBufSize = storeConfig.getLong("container.write.buffer.size.bytes", 32 * 1024 * 1024)
-    val options = new Options();
+    val options = new Options()
 
     // Cache size and write buffer size are specified on a per-container basis.
     val numTasks = containerContext.taskNames.size
@@ -77,6 +77,7 @@ object RocksDbKeyValueStore extends Logging {
 class RocksDbKeyValueStore(
   val dir: File,
   val options: Options,
+  val writeOptions: WriteOptions = new WriteOptions(),
   val metrics: KeyValueStoreMetrics = new KeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
 
   private lazy val db = RocksDB.open(options, dir.toString)
@@ -97,11 +98,11 @@ class RocksDbKeyValueStore(
     metrics.puts.inc
     require(key != null, "Null key not allowed.")
     if (value == null) {
-      db.remove(key)
+      db.remove(writeOptions, key)
       deletesSinceLastCompaction += 1
     } else {
       metrics.bytesWritten.inc(key.size + value.size)
-      db.put(key, value)
+      db.put(writeOptions, key, value)
     }
   }
 
@@ -115,12 +116,12 @@ class RocksDbKeyValueStore(
       val curr = iter.next()
       if (curr.getValue == null) {
         deletes += 1
-        db.remove(curr.getKey);
+        db.remove(writeOptions, curr.getKey)
       } else {
         val key = curr.getKey
         val value = curr.getValue
         metrics.bytesWritten.inc(key.size + value.size)
-        db.put(key, value)
+        db.put(writeOptions, key, value)
       }
     }
     metrics.puts.inc(wrote)

http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-test/src/main/config/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kv-perf.properties b/samza-test/src/main/config/perf/kv-perf.properties
index 0d487b1..33fcd8d 100644
--- a/samza-test/src/main/config/perf/kv-perf.properties
+++ b/samza-test/src/main/config/perf/kv-perf.properties
@@ -15,7 +15,27 @@
 # specific language governing permissions and limitations
 # under the License.
 
-stores.test.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
-stores.test.compaction.delete.threshold=1000
-test.partition.count=4
-test.num.loops=1000
+# Config for all-with-deletes
+test.all-with-deletes.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+
+test.all-with-deletes.partition.count=4
+
+test.all-with-deletes.set.count=1
+test.all-with-deletes.set-1.num.loops=1000
+
+# Config for rocksdb-write-performance
+test.rocksdb-write-performance.stores.test-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
+test.rocksdb-write-performance.partition.count=4
+
+test.rocksdb-write-performance.set.count=3
+test.rocksdb-write-performance.set-1.message.size=256
+test.rocksdb-write-performance.set-1.message.count=1000000
+
+test.rocksdb-write-performance.set-2.message.size=512
+test.rocksdb-write-performance.set-2.message.count=1000000
+
+test.rocksdb-write-performance.set-3.message.size=1024
+test.rocksdb-write-performance.set-3.message.count=1000000
+
+# List of tests to execute
+test.methods=rocksdb-write-performance

http://git-wip-us.apache.org/repos/asf/samza/blob/a6ada1f6/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index b4fa7d3..0858b98 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -22,114 +22,134 @@ package org.apache.samza.test.performance
 import org.apache.samza.util.Logging
 import org.apache.samza.config.Config
 import org.apache.samza.config.StorageConfig._
-import org.apache.samza.container.{ TaskName, SamzaContainerContext }
+import org.apache.samza.container.{TaskName, SamzaContainerContext}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.kv.KeyValueStorageEngine
 import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Util
-import org.apache.samza.serializers.ByteSerde
+import org.apache.samza.serializers.{StringSerde, ByteSerde, SerdeManager}
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.system.SystemProducers
 import org.apache.samza.system.SystemProducer
-import org.apache.samza.serializers.SerdeManager
 import java.io.File
 import java.util.UUID
+import java.util
 
 /**
  * A simple CLI-based tool for running various key-value performance tests.
+ *
+ * List of KeyValuePerformance tests must be defined in 'test.methods' configuration as a comma-separated value.
+ * The tool splits this list to determine which tests to run.
+ *
+ * Each test should define its own set of configuration for partition count, stores etc.
+ * using the "test.<test-name>.<config-string>=<config-value>" pattern
+ *
+ * Each test may define one or more test parameterss.
+ * For example, test1 can define 2 sets of parameters by specifying "test.test1.set.count=2" and
+ * define each set as:
+ * "test.test1.set-1.<param-name>=<param-value>"
+ * "test.test1.set-2.<param-name>=<param-value>"
  */
+
 object TestKeyValuePerformance extends Logging {
   val Encoding = "UTF-8"
 
-  /**
-   * KeyValuePerformance job configs must define a 'test.method' configuration.
-   * This configuration must be the value of one of the keys in this map. The
-   * test uses this key to determine which test to run.
-   */
-  val testMethods: Map[String, (Config) => Unit] = Map("testAllWithDeletes" -> runTestAllWithDeletes)
+  val testMethods: Map[String, (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit] = Map(
+    "all-with-deletes" -> runTestAllWithDeletes,
+    "rocksdb-write-performance" -> runTestMsgWritePerformance
+  )
 
   def main(args: Array[String]) {
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    val testMethod = config.get("test.method", "testAllWithDeletes")
-
-    info("Got arguments: %s" format args.toList)
-    info("Using config: %s" format config)
-    info("Using test method: %s" format testMethod)
-
-    if (testMethods.contains(testMethod)) {
-      testMethods(testMethod)(config)
-    } else {
-      error("Invalid test method. Valid methods are: %s" format testMethods.keys)
+    val tests = config.get("test.methods", "rocksdb-write-performance,all-with-deletes").split(",")
 
-      throw new SamzaException("Unknown test method: %s" format testMethod)
+    tests.foreach{ test =>
+      info("Running test: %s" format test)
+      if(testMethods.contains(test)) {
+        invokeTest(test, testMethods(test), config.subset("test." + test + ".", true))
+      } else {
+        error("Invalid test method. valid methods are: %s" format testMethods.keys)
+        throw new SamzaException("Unknown test method: %s" format test)
+      }
     }
   }
 
-  /**
-   * Do wiring for testAllWithDeletes, and run the test.
-   */
-  def runTestAllWithDeletes(config: Config) {
-    val test = new TestKeyValuePerformance
-    val serde = new ByteSerde
-    val partitionCount = config.getInt("test.partition.count", 1)
-    val numLoops = config.getInt("test.num.loops", 100)
-    val messagesPerBatch = config.getInt("test.messages.per.batch", 10000)
-    val messageSizeBytes = config.getInt("test.message.size.bytes", 200)
-    val taskNames = new java.util.ArrayList[TaskName]()
-
+  def invokeTest(testName: String, testMethod: (KeyValueStorageEngine[Array[Byte], Array[Byte]], Config) => Unit, config: Config) {
+    val taskNames = new util.ArrayList[TaskName]()
+    val partitionCount = config.getInt("partition.count", 1)
     (0 until partitionCount).map(p => taskNames.add(new TaskName(new Partition(p).toString)))
 
-    info("Using partition count: %s" format partitionCount)
-    info("Using num loops: %s" format numLoops)
-    info("Using messages per batch: %s" format messagesPerBatch)
-    info("Using message size: %s bytes" format messageSizeBytes)
-
+    val producerMultiplexer = new SystemProducers(
+      Map[String, SystemProducer](),
+      new SerdeManager
+    )
     // Build a Map[String, StorageEngineFactory]. The key is the store name.
-    val storageEngineFactories = config
+    val storageEngineMappings = config
       .getStoreNames
       .map(storeName => {
-        val storageFactoryClassName = config
-          .getStorageFactoryClassName(storeName)
-          .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
+        val storageFactoryClassName =
+          config.getStorageFactoryClassName(storeName)
+                .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
         (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName))
-      }).toMap
+    })
 
-    val producerMultiplexer = new SystemProducers(
-      Map[String, SystemProducer](),
-      new SerdeManager)
+    for((storeName, storageEngine) <- storageEngineMappings) {
+      val testSetCount = config.getInt("set.count", 1)
+      (1 to testSetCount).foreach(testSet => {
+        //Create a new DB instance for each test set
+        val output = new File("/tmp/" + UUID.randomUUID())
+        val byteSerde = new ByteSerde
+        info("Using output directory %s for %s using %s." format (output, storeName, storageEngine.getClass.getCanonicalName))
+        val engine = storageEngine.getStorageEngine(
+          storeName,
+          output,
+          byteSerde,
+          byteSerde,
+          new TaskInstanceCollector(producerMultiplexer),
+          new MetricsRegistryMap,
+          null,
+          new SamzaContainerContext(0, config, taskNames)
+        )
+
+        val db = if(!engine.isInstanceOf[KeyValueStorageEngine[_,_]]) {
+          throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
+        } else {
+          engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
+        }
+
+        // Run the test method
+        testMethod(db, config.subset("set-" + testSet + ".", true))
+
+        info("Cleaning up output directory for %s." format storeName)
+        Util.rm(output)
+      })
+    }
+  }
 
-    for ((storeName, storageEngine) <- storageEngineFactories) {
-      val output = new File("/tmp/" + UUID.randomUUID)
+  def runTestAllWithDeletes(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
+    val numLoops = config.getInt("num.loops", 100)
+    val messagesPerBatch = config.getInt("messages.per.batch", 10000)
+    val messageSizeBytes = config.getInt("message.size.bytes", 200)
 
-      info("Using output directory %s for %s." format (output, storeName))
+    info("Using (num loops, messages per batch, message size in bytes) => (%s, %s, %s)" format (numLoops, messagesPerBatch, messageSizeBytes))
+    new TestKeyValuePerformance().testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
 
-      val engine = storageEngine.getStorageEngine(
-        storeName,
-        output,
-        serde,
-        serde,
-        new TaskInstanceCollector(producerMultiplexer),
-        new MetricsRegistryMap,
-        null,
-        new SamzaContainerContext(0, config, taskNames))
+  }
 
-      val db = if (!engine.isInstanceOf[KeyValueStorageEngine[_, _]]) {
-        throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
-      } else {
-        engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
-      }
+  def runTestMsgWritePerformance(db: KeyValueStore[Array[Byte], Array[Byte]], config: Config) {
+    val messageSizeBytes = config.getInt("message.size", 200)
+    val messageCount = config.getInt("message.count", 10000)
 
-      test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
-      info("Cleaning up output directory for %s." format storeName)
-      Util.rm(output)
-    }
+    info("Using (message count, message size in bytes) => (%s, %s)" format (messageCount, messageSizeBytes))
+    new TestKeyValuePerformance().testMsgWritePerformance(db, messageCount, messageSizeBytes)
   }
+
 }
 
 class TestKeyValuePerformance extends Logging {
@@ -179,4 +199,27 @@ class TestKeyValuePerformance extends Logging {
 
     info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
   }
+
+
+  /**
+   * Test that successively writes a set of fixed-size messages to the KV store
+   * and computes the total time for the operations
+   * @param store Key-Value store instance that is being tested
+   * @param numMsgs Total number of messages to write to the store
+   * @param msgSizeInBytes Size of each message in Bytes
+   */
+  def testMsgWritePerformance(
+    store: KeyValueStore[Array[Byte], Array[Byte]],
+    numMsgs: Int = 10000,
+    msgSizeInBytes: Int = 200) {
+
+    val msg = (0 until msgSizeInBytes).map(i => "x").mkString.getBytes(Encoding)
+
+    val start = System.currentTimeMillis
+    (0 until numMsgs).foreach(i => {
+      store.put(i.toString.getBytes(Encoding), msg)
+    })
+    val timeTaken = System.currentTimeMillis - start
+    info("Total time to write %d msgs of size %d bytes : %s s" format (numMsgs, msgSizeInBytes, timeTaken * .001))
+  }
 }
\ No newline at end of file