You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/30 21:31:19 UTC
git commit: SAMZA-254;
improve store.all call time in cases where there are a lot of deletes
Repository: incubator-samza
Updated Branches:
refs/heads/master 92f387f13 -> f3cb10924
SAMZA-254; improve store.all call time in cases where there are a lot of deletes
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/f3cb1092
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/f3cb1092
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/f3cb1092
Branch: refs/heads/master
Commit: f3cb109245693d4b08c92e21de85b507bd4d3f70
Parents: 92f387f
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Apr 30 12:25:30 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Apr 30 12:25:30 2014 -0700
----------------------------------------------------------------------
README.md | 4 +
build.gradle | 9 +
.../kv/KeyValueStorageEngineFactory.scala | 3 +-
.../samza/storage/kv/LevelDbKeyValueStore.scala | 41 ++++-
.../samza/storage/kv/TestKeyValueStores.scala | 1 -
.../src/main/resources/perf/kv-perf.properties | 21 +++
.../performance/TestKeyValuePerformance.scala | 178 +++++++++++++++++++
7 files changed, 251 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index a930cf0..a1d71b1 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,10 @@ To run a single test:
./gradlew clean :samza-test:test -Dtest.single=TestStatefulTask
+To run key-value performance tests:
+
+ ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/resources/perf/kv-perf.properties
+
### Job Management
To run a job (defined in a properties file):
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 65d95d4..055ef9b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -167,6 +167,7 @@ project(":samza-shell") {
dependencies {
gradleShell project(":samza-core_$scalaVersion")
gradleShell project(":samza-kafka_$scalaVersion")
+ gradleShell project(":samza-test_$scalaVersion")
gradleShell project(":samza-yarn_$scalaVersion")
gradleShell "org.slf4j:slf4j-simple:$slf4jVersion"
}
@@ -193,6 +194,14 @@ project(":samza-shell") {
if (project.hasProperty('configPath')) args += ['--config-path', configPath]
if (project.hasProperty('newOffsets')) args += ['--new-offsets', newOffsets]
}
+
+ // Usage: ./gradlew samza-shell:kvPerformanceTest
+ // -PconfigPath=file:///path/to/job/config.properties
+ task kvPerformanceTest(type:JavaExec) {
+ main = 'org.apache.samza.test.performance.TestKeyValuePerformance'
+ classpath = configurations.gradleShell
+ if (project.hasProperty('configPath')) args += ['--config-path', configPath]
+ }
}
project(":samza-kv_$scalaVersion") {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
index 81fe861..36b7e97 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineFactory.scala
@@ -44,6 +44,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
val batchSize = storageConfig.getInt("write.batch.size", 500)
val cacheSize = storageConfig.getInt("object.cache.size", math.max(batchSize, 1000))
+ val deleteCompactionThreshold = storageConfig.getInt("compaction.delete.threshold", -1)
val enableCache = cacheSize > 0
if (cacheSize > 0 && cacheSize < batchSize) {
@@ -60,7 +61,7 @@ class KeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V] {
val levelDbMetrics = new LevelDbKeyValueStoreMetrics(storeName, registry)
val levelDbOptions = LevelDbKeyValueStore.options(storageConfig, containerContext)
- val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, levelDbMetrics)
+ val levelDb = new LevelDbKeyValueStore(storeDir, levelDbOptions, deleteCompactionThreshold, levelDbMetrics)
val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
levelDb
} else {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
index 8602a32..dae3c2c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LevelDbKeyValueStore.scala
@@ -28,7 +28,7 @@ import java.util.Iterator
import java.lang.Iterable
import org.apache.samza.config.Config
import org.apache.samza.container.SamzaContainerContext
-import grizzled.slf4j.{Logger, Logging}
+import grizzled.slf4j.{ Logger, Logging }
object LevelDbKeyValueStore {
private lazy val logger = Logger(classOf[LevelDbKeyValueStore])
@@ -45,12 +45,11 @@ object LevelDbKeyValueStore {
options.compressionType(
storeConfig.get("leveldb.compression", "snappy") match {
case "snappy" => CompressionType.SNAPPY
- case "none" => CompressionType.NONE
+ case "none" => CompressionType.NONE
case _ =>
logger.warn("Unknown leveldb.compression codec %s, defaulting to Snappy" format storeConfig.get("leveldb.compression", "snappy"))
CompressionType.SNAPPY
- }
- )
+ })
options.createIfMissing(true)
options.errorIfExists(true)
options
@@ -60,12 +59,21 @@ object LevelDbKeyValueStore {
class LevelDbKeyValueStore(
val dir: File,
val options: Options,
+
+ /**
+ * How many deletes must occur before we will force a compaction. This is to
+ * get around performance issues discovered in SAMZA-254. A value of -1
+ * disables this feature.
+ */
+ val deleteCompactionThreshold: Int = -1,
val metrics: LevelDbKeyValueStoreMetrics = new LevelDbKeyValueStoreMetrics) extends KeyValueStore[Array[Byte], Array[Byte]] with Logging {
private lazy val db = factory.open(dir, options)
private val lexicographic = new LexicographicComparator()
+ private var deletesSinceLastCompaction = 0
def get(key: Array[Byte]): Array[Byte] = {
+ maybeCompact
metrics.gets.inc
require(key != null, "Null key not allowed.")
val found = db.get(key)
@@ -80,6 +88,7 @@ class LevelDbKeyValueStore(
require(key != null, "Null key not allowed.")
if (value == null) {
db.delete(key)
+ deletesSinceLastCompaction += 1
} else {
metrics.bytesWritten.inc(key.size + value.size)
db.put(key, value)
@@ -108,6 +117,7 @@ class LevelDbKeyValueStore(
batch.close
metrics.puts.inc(wrote)
metrics.deletes.inc(deletes)
+ deletesSinceLastCompaction += deletes
}
def delete(key: Array[Byte]) {
@@ -116,18 +126,41 @@ class LevelDbKeyValueStore(
}
def range(from: Array[Byte], to: Array[Byte]): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ maybeCompact
metrics.ranges.inc
require(from != null && to != null, "Null bound not allowed.")
new LevelDbRangeIterator(db.iterator, from, to)
}
def all(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ maybeCompact
metrics.alls.inc
val iter = db.iterator()
iter.seekToFirst()
new LevelDbIterator(iter)
}
+ /**
+ * Trigger a complete compaction on the LevelDB store if there have been at
+ * least deleteCompactionThreshold deletes since the last compaction.
+ */
+ def maybeCompact = {
+ if (deleteCompactionThreshold >= 0 && deletesSinceLastCompaction >= deleteCompactionThreshold) {
+ compact
+ }
+ }
+
+ /**
+ * Trigger a complete compaction of the LevelDB store.
+ */
+ def compact {
+ // According to LevelDB's docs:
+ // begin==NULL is treated as a key before all keys in the database.
+ // end==NULL is treated as a key after all keys in the database.
+ db.compactRange(null, null)
+ deletesSinceLastCompaction = 0
+ }
+
def flush {
metrics.flushes.inc
// TODO can't find a flush for leveldb
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 85ba11a..bed9f84 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -307,7 +307,6 @@ class TestKeyValueStores(typeOfStore: String) {
*/
def s(b: Array[Byte]) =
new String(b)
-
}
object TestKeyValueStores {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-test/src/main/resources/perf/kv-perf.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/perf/kv-perf.properties b/samza-test/src/main/resources/perf/kv-perf.properties
new file mode 100644
index 0000000..dcc223f
--- /dev/null
+++ b/samza-test/src/main/resources/perf/kv-perf.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+stores.test.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+stores.test.compaction.delete.threshold=1000
+test.partition.count=4
+test.num.loops=1000
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/f3cb1092/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
new file mode 100644
index 0000000..5b9b926
--- /dev/null
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.performance
+
+import grizzled.slf4j.Logging
+import org.apache.samza.config.Config
+import org.apache.samza.config.StorageConfig._
+import org.apache.samza.container.SamzaContainerContext
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.storage.kv.KeyValueStore
+import org.apache.samza.storage.kv.KeyValueStorageEngine
+import org.apache.samza.storage.kv.KeyValueStorageEngineFactory
+import org.apache.samza.storage.StorageEngineFactory
+import org.apache.samza.task.ReadableCollector
+import org.apache.samza.util.CommandLine
+import org.apache.samza.util.Util
+import org.apache.samza.serializers.Serde
+import org.apache.samza.serializers.ByteSerde
+import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import java.io.File
+import scala.collection.JavaConversions._
+import java.util.UUID
+
+/**
+ * A simple CLI-based tool for running various key-value performance tests.
+ */
+object TestKeyValuePerformance extends Logging {
+ val Encoding = "UTF-8"
+
+ /**
+ * KeyValuePerformance job configs must define a 'test.method' configuration.
+ * This configuration must be the value of one of the keys in this map. The
+ * test uses this key to determine which test to run.
+ */
+ val testMethods: Map[String, (Config) => Unit] = Map("testAllWithDeletes" -> runTestAllWithDeletes)
+
+ def main(args: Array[String]) {
+ val cmdline = new CommandLine
+ val options = cmdline.parser.parse(args: _*)
+ val config = cmdline.loadConfig(options)
+ val testMethod = config.get("test.method", "testAllWithDeletes")
+
+ info("Got arguments: %s" format args.toList)
+ info("Using config: %s" format config)
+ info("Using test method: %s" format testMethod)
+
+ if (testMethods.contains(testMethod)) {
+ testMethods(testMethod)(config)
+ } else {
+ error("Invalid test method. Valid methods are: %s" format testMethods.keys)
+
+ throw new SamzaException("Unknown test method: %s" format testMethod)
+ }
+ }
+
+ /**
+ * Do wiring for testAllWithDeletes, and run the test.
+ */
+ def runTestAllWithDeletes(config: Config) {
+ val test = new TestKeyValuePerformance
+ val serde = new ByteSerde
+ val partitionCount = config.getInt("test.partition.count", 1)
+ val numLoops = config.getInt("test.num.loops", 100)
+ val messagesPerBatch = config.getInt("test.messages.per.batch", 10000)
+ val messageSizeBytes = config.getInt("test.message.size.bytes", 200)
+ val partitions = (0 until partitionCount).map(new Partition(_))
+
+ info("Using partition count: %s" format partitionCount)
+ info("Using num loops: %s" format numLoops)
+ info("Using messages per batch: %s" format messagesPerBatch)
+ info("Using message size: %s bytes" format messageSizeBytes)
+
+ // Build a Map[String, StorageEngineFactory]. The key is the store name.
+ val storageEngineFactories = config
+ .getStoreNames
+ .map(storeName => {
+ val storageFactoryClassName = config
+ .getStorageFactoryClassName(storeName)
+ .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
+ (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName))
+ }).toMap
+
+ for ((storeName, storageEngine) <- storageEngineFactories) {
+ val output = new File("/tmp/" + UUID.randomUUID)
+
+ info("Using output directory %s for %s." format (output, storeName))
+
+ val engine = storageEngine.getStorageEngine(
+ storeName,
+ output,
+ serde,
+ serde,
+ new ReadableCollector,
+ new MetricsRegistryMap,
+ null,
+ new SamzaContainerContext("test", config, partitions))
+
+ val db = if (!engine.isInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]) {
+ throw new SamzaException("This test can only run with KeyValueStorageEngine configured as store factory.")
+ } else {
+ engine.asInstanceOf[KeyValueStorageEngine[Array[Byte], Array[Byte]]]
+ }
+
+ test.testAllWithDeletes(db, numLoops, messagesPerBatch, messageSizeBytes)
+
+ info("Cleaning up output directory for %s." format storeName)
+
+ Util.rm(output)
+ }
+ }
+}
+
+class TestKeyValuePerformance extends Logging {
+ import TestKeyValuePerformance._
+
+ /**
+ * A test that writes messagesPerBatch messages, deletes them all, then calls
+ * store.all. The test periodically outputs the time it takes to complete
+ * these operations. This test is useful to trouble shoot issues with LevleDB
+ * such as the issue documented in SAMZA-254.
+ */
+ def testAllWithDeletes(
+ store: KeyValueStore[Array[Byte], Array[Byte]],
+
+ /**
+ * How many times a batch of messages should be written and deleted.
+ */
+ numLoops: Int = 100,
+
+ /**
+ * The number of messages to write and delete per-batch.
+ */
+ messagesPerBatch: Int = 10000,
+
+ /**
+ * The size of the messages to write.
+ */
+ messageSizeBytes: Int = 200) {
+
+ val stuff = (0 until messageSizeBytes).map(i => "a").mkString.getBytes(Encoding)
+ val start = System.currentTimeMillis
+
+ (0 until numLoops).foreach(i => {
+ info("(%sms) Total written to store: %s" format (System.currentTimeMillis - start, i * messagesPerBatch))
+
+ (0 until messagesPerBatch).foreach(j => {
+ val k = (i * j).toString.getBytes(Encoding)
+ store.put(k, stuff)
+ store.delete(k)
+ })
+
+ val allStart = System.currentTimeMillis
+ val iter = store.all
+ info("(%sms) all() took %sms." format (System.currentTimeMillis - start, System.currentTimeMillis - allStart))
+ iter.close
+ })
+
+ info("Total time: %ss" format ((System.currentTimeMillis - start) * .001))
+ }
+}
\ No newline at end of file