You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/30 21:31:19 UTC

git commit: SAMZA-254; improve store.all call time in cases where there are a lot of deletes

Repository: incubator-samza
Updated Branches:
  refs/heads/master 92f387f13 -> f3cb10924


SAMZA-254; improve store.all call time in cases where there are a lot of deletes


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f3cb1092
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f3cb1092
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f3cb1092

Branch: refs/heads/master
Commit: f3cb109245693d4b08c92e21de85b507bd4d3f70
Parents: 92f387f
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Apr 30 12:25:30 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Apr 30 12:25:30 2014 -0700

----------------------------------------------------------------------
 README.md                                       |   4 +
 build.gradle                                    |   9 +
 .../kv/KeyValueStorageEngineFactory.scala       |   3 +-
 .../samza/storage/kv/LevelDbKeyValueStore.scala |  41 ++++-
 .../samza/storage/kv/TestKeyValueStores.scala   |   1 -
 .../src/main/resources/perf/kv-perf.properties  |  21 +++
 .../performance/TestKeyValuePerformance.scala   | 178 +++++++++++++++++++
 7 files changed, 251 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index a930cf0..a1d71b1 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,10 @@ To run a single test:
 
     ./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask
 
+To run key-value performance tests:
+
+    ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/resources/perf/kv-perf.properties
+
 ### Job Management
 
 To run a job (defined in a properties file):

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 65d95d4..055ef9b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -167,6 +167,7 @@ project(":samza-shell") {
   dependencies {
     gradleShell project(":samza-core_$scalaVersion")
     gradleShell project(":samza-kafka_$scalaVersion")
+    gradleShell project(":samza-test_$scalaVersion")
     gradleShell project(":samza-yarn_$scalaVersion")
     gradleShell "org.slf4j:slf4j-simple:$slf4jVersion"
   }
@@ -193,6 +194,14 @@ project(":samza-shell") {
     if (project.hasProperty('configPath')) args += ['--config-path', configPath]
     if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets]
   }
+
+  // Usage: ./gradlew samza-shell:kvPerformanceTest
+  //    -PconfigPath=file:///path/to/job/config.properties
+  task kvPerformanceTest(type:JavaExec) {
+    main = 'org.apache.samza.test.performance.TestKeyValuePerformance'
+    classpath = configurations.gradleShell
+    if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+  }
 }
 
 project(":samza-kv_$scalaVersion") {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
index 81fe861..36b7e97 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -44,6 +44,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
     val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
     val batchSize = storageConfig.getInt("write.batch.size", 500)
     val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000))
+    val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
     val enableCache = cacheSize > 0
 
     if (cacheSize > 0 && cacheSize < batchSize) {
@@ -60,7 +61,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
 
     val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry)
     val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
-    val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, levelDbMetrics)
+    val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
     val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
       levelDb
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index 8602a32..dae3c2c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -28,7 +28,7 @@ import java.util.Iterator
 import java.lang.Iterable
 import org.apache.samza.config.Config
 import org.apache.samza.container.SamzaContainerContext
-import grizzled.slf4j.{Logger, Logging}
+import grizzled.slf4j.{ Logger, Logging }
 
 object LevelDbKeyValueStore {
   private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
@@ -45,12 +45,11 @@ object LevelDbKeyValueStore {
     options.compressionType(
       storeConfig.get("leveldb.compression", "snappy") match {
         case "snappy" => CompressionType.SNAPPY
-        case "none"   => CompressionType.NONE
+        case "none" => CompressionType.NONE
         case _ =>
           logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
           CompressionType.SNAPPY
-      }
-    )
+      })
     options.createIfMissing(true)
     options.errorIfExists(true)
     options
@@ -60,12 +59,21 @@ object LevelDbKeyValueStore {
 class LevelDbKeyValueStore(
   val dir: File,
   val options: Options,
+
+  /**
+   * How many deletes must occur before we will force a compaction. This is to
+   * get around performance issues discovered in SAMZA-254. A value of -1 
+   * disables this feature.
+   */
+  val deleteCompactionThreshold: Int = -1,
   val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
 
   private lazy val db = factory.open(dir, options)
   private val lexicographic = new LexicographicComparator()
+  private var deletesSinceLastCompaction = 0
 
   def get(key: Array[Byte]): Array[Byte] = {
+    maybeCompact
     metrics.gets.inc
     require(key != null, "Null key not allowed.")
     val found = db.get(key)
@@ -80,6 +88,7 @@ class LevelDbKeyValueStore(
     require(key != null, "Null key not allowed.")
     if (value == null) {
       db.delete(key)
+      deletesSinceLastCompaction += 1
     } else {
       metrics.bytesWritten.inc(key.size + value.size)
       db.put(key, value)
@@ -108,6 +117,7 @@ class LevelDbKeyValueStore(
     batch.close
     metrics.puts.inc(wrote)
     metrics.deletes.inc(deletes)
+    deletesSinceLastCompaction += deletes
   }
 
   def delete(key: Array[Byte]) {
@@ -116,18 +126,41 @@ class LevelDbKeyValueStore(
   }
 
   def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    maybeCompact
     metrics.ranges.inc
     require(from != null && to != null, "Null bound not allowed.")
     new LevelDbRangeIterator(db.iterator, from, to)
   }
 
   def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+    maybeCompact
     metrics.alls.inc
     val iter = db.iterator()
     iter.seekToFirst()
     new LevelDbIterator(iter)
   }
 
+  /**
+   * Trigger a complete compaction on the LevelDB store if there have been at
+   * least deleteCompactionThreshold deletes since the last compaction.
+   */
+  def maybeCompact = {
+    if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
+      compact
+    }
+  }
+
+  /**
+   * Trigger a complete compaction of the LevelDB store.
+   */
+  def compact {
+    // According to LevelDB's docs:
+    // begin==NULL is treated as a key before all keys in the database.
+    // end==NULL is treated as a key after all keys in the database.
+    db.compactRange(null, null)
+    deletesSinceLastCompaction = 0
+  }
+
   def flush {
     metrics.flushes.inc
     // TODO can't find a flush for leveldb

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 85ba11a..bed9f84 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -307,7 +307,6 @@ class TestKeyValueStores(typeOfStore: String) {
    */
   def s(b: Array[Byte]) =
     new String(b)
-
 }
 
 object TestKeyValueStores {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-test/src/main/resources/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/perf/kv-perf.properties b/samza-test/src/main/resources/perf/kv-perf.properties
new file mode 100644
index 0000000..dcc223f
--- /dev/null
+++ b/samza-test/src/main/resources/perf/kv-perf.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+stores.test.compaction.delete.threshold=1000
+test.partition.count=4
+test.num.loops=1000

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
new file mode 100644
index 0000000..5b9b926
--- /dev/null
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.performance
+
+import grizzled.slf4j.Logging
+import org.apache.samza.config.Config
+import org.apache.samza.config.StorageConfig._
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.storage.kv.KeyValueStorageEngine
+import org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.task.ReadableCollector
+import org.apache.samza.util.CommandLine
+import org.apache.samza.util.Util
+import org.apache.samza.serializers.Serde
+import org.apache.samza.serializers.ByteSerde
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import java.io.File
+import scala.collection.JavaConversions._
+import java.util.UUID
+
+/**
+ * A simple CLI-based tool for running various key-value performance tests.
+ */
+object TestKeyValuePerformance extends Logging {
+  val Encoding = "UTF-8"
+
+  /**
+   * KeyValuePerformance job configs must define a 'test.method' configuration.
+   * This configuration must be the value of one of the keys in this map. The
+   * test uses this key to determine which test to run.
+   */
+  val testMethods: Map[String, (Config) => Unit] = Map("testAllWithDeletes" -> runTestAllWithDeletes)
+
+  def main(args: Array[String]) {
+    val cmdline = new CommandLine
+    val options = cmdline.parser.parse(args: _*)
+    val config = cmdline.loadConfig(options)
+    val testMethod = config.get("test.method", "testAllWithDeletes")
+
+    info("Got arguments: %s" format args.toList)
+    info("Using config: %s" format config)
+    info("Using test method: %s" format testMethod)
+
+    if (testMethods.contains(testMethod)) {
+      testMethods(testMethod)(config)
+    } else {
+      error("Invalid test method. Valid methods are: %s" format testMethods.keys)
+
+      throw new SamzaException("Unknown test method: %s" format testMethod)
+    }
+  }
+
+  /**
+   * Do wiring for testAllWithDeletes, and run the test.
+   */
+  def runTestAllWithDeletes(config: Config) {
+    val test = new TestKeyValuePerformance
+    val serde = new ByteSerde
+    val partitionCount = config.getInt("test.partition.count", 1)
+    val numLoops = config.getInt("test.num.loops", 100)
+    val messagesPerBatch = config.getInt("test.messages.per.batch", 10000)
+    val messageSizeBytes = config.getInt("test.message.size.bytes", 200)
+    val partitions = (0 until partitionCount).map(new Partition(_))
+
+    info("Using partition count: %s" format partitionCount)
+    info("Using num loops: %s" format numLoops)
+    info("Using messages per batch: %s" format messagesPerBatch)
+    info("Using message size: %s bytes" format messageSizeBytes)
+
+    // Build a Map[String, StorageEngineFactory]. The key is the store name.
+    val storageEngineFactories = config
+      .getStoreNames
+      .map(storeName => {
+        val storageFactoryClassName = config
+          .getStorageFactoryClassName(storeName)
+          .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
+        (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName))
+      }).toMap
+
+    for ((storeName, storageEngine) <- storageEngineFactories) {
+      val output = new File("/tmp/" + UUID.randomUUID)
+
+      info("Using output directory %s for %s." format (output, storeName))
+
+      val engine = storageEngine.getStorageEngine(
+        storeName,
+        output,
+        serde,
+        serde,
+        new ReadableCollector,
+        new MetricsRegistryMap,
+        null,
+        new SamzaContainerContext("test", config, partitions))
+
+      val db = if (!engine.isInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]) {
+        throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
+      } else {
+        engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
+      }
+
+      test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
+
+      info("Cleaning up output directory for %s." format storeName)
+
+      Util.rm(output)
+    }
+  }
+}
+
+class TestKeyValuePerformance extends Logging {
+  import TestKeyValuePerformance._
+
+  /**
+   * A test that writes messagesPerBatch messages, deletes them all, then calls
+   * store.all. The test periodically outputs the time it takes to complete
+   * these operations. This test is useful to trouble shoot issues with LevleDB
+   * such as the issue documented in SAMZA-254.
+   */
+  def testAllWithDeletes(
+    store: KeyValueStore[Array[Byte], Array[Byte]],
+
+    /**
+     * How many times a batch of messages should be written and deleted.
+     */
+    numLoops: Int = 100,
+
+    /**
+     * The number of messages to write and delete per-batch.
+     */
+    messagesPerBatch: Int = 10000,
+
+    /**
+     * The size of the messages to write.
+     */
+    messageSizeBytes: Int = 200) {
+
+    val stuff = (0 until messageSizeBytes).map(i => "a").mkString.getBytes(Encoding)
+    val start = System.currentTimeMillis
+
+    (0 until numLoops).foreach(i => {
+      info("(%sms) Total written to store: %s" format (System.currentTimeMillis - start, i * messagesPerBatch))
+
+      (0 until messagesPerBatch).foreach(j => {
+        val k = (i * j).toString.getBytes(Encoding)
+        store.put(k, stuff)
+        store.delete(k)
+      })
+
+      val allStart = System.currentTimeMillis
+      val iter = store.all
+      info("(%sms) all() took %sms." format (System.currentTimeMillis - start, System.currentTimeMillis - allStart))
+      iter.close
+    })
+
+    info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
+  }
+}
\ No newline at end of file