You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/16 23:06:33 UTC
samza git commit: SAMZA-1461; Expose RocksDB properties as metrics
Repository: samza
Updated Branches:
refs/heads/master c93dd8f60 -> 343712e30
SAMZA-1461; Expose RocksDB properties as metrics
Automatically build gauges for RocksDB properties via configuration:
`stores.<storename>.rocksdb.telemetry.list=<rocksDbProperty1>, <rocksDbProperty1>`
Author: Janek Lasocki-Biczysko <ja...@skyscanner.net>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #327 from janeklb/jlb_rocksDBPropertiesAsMetrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/343712e3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/343712e3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/343712e3
Branch: refs/heads/master
Commit: 343712e30c0a7201d6bb47f340187af51f395671
Parents: c93dd8f
Author: Janek Lasocki-Biczysko <ja...@skyscanner.net>
Authored: Mon Oct 16 16:06:28 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Mon Oct 16 16:06:28 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 8 +++++
.../samza/storage/kv/RocksDbKeyValueStore.scala | 32 ++++++++++++-----
.../storage/kv/TestRocksDbKeyValueStore.scala | 38 ++++++++++++++++++--
3 files changed, 66 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index fc1e86d..fb8a97e 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1781,6 +1781,14 @@
</tr>
<tr>
+ <td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
+ <td class="default"></td>
+ <td class="description">
+ A list of RocksDB <a href="https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409">properties</a> to expose as metrics (gauges).
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="cluster-manager">
Running Samza with a cluster manager<br>
</th>
http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index c771788..023e4a8 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -28,7 +28,7 @@ import org.rocksdb.TtlDB
object RocksDbKeyValueStore extends Logging {
- def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String): RocksDB = {
+ def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
var ttl = 0L
var useTTL = false
@@ -68,15 +68,29 @@ object RocksDbKeyValueStore extends Logging {
try
{
- if (useTTL)
- {
- info("Opening RocksDB store with TTL value: %s" format ttl)
- TtlDB.open(options, dir.toString, ttl.toInt, false)
- }
- else
+ val rocksDb =
+ if (useTTL)
+ {
+ info("Opening RocksDB store with TTL value: %s" format ttl)
+ TtlDB.open(options, dir.toString, ttl.toInt, false)
+ }
+ else
+ {
+ RocksDB.open(options, dir.toString)
+ }
+
+ if (storeConfig.containsKey("rocksdb.metrics.list"))
{
- RocksDB.open(options, dir.toString)
+ storeConfig
+ .get("rocksdb.metrics.list")
+ .split(",")
+ .map(property => property.trim)
+ .foreach(property =>
+ metrics.newGauge(property, () => rocksDb.getProperty(property))
+ )
}
+
+ rocksDb
}
catch
{
@@ -104,7 +118,7 @@ class RocksDbKeyValueStore(
// lazy val here is important because the store directories do not exist yet, it can only be opened
// after the directories are created, which happens much later from now.
- private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName)
+ private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName, metrics)
private val lexicographic = new LexicographicComparator()
def get(key: Array[Byte]): Array[Byte] = {
http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 05d39ea..6f129be 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -24,6 +24,7 @@ import java.io.File
import java.util
import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
import org.apache.samza.util.ExponentialSleepStrategy
import org.junit.{Assert, Test}
import org.rocksdb.{RocksIterator, RocksDB, FlushOptions, Options}
@@ -41,7 +42,8 @@ class TestRocksDbKeyValueStore
options,
config,
false,
- "someStore")
+ "someStore",
+ null)
val key = "test".getBytes("UTF-8")
rocksDB.put(key, "val".getBytes("UTF-8"))
Assert.assertNotNull(rocksDB.get(key))
@@ -72,7 +74,8 @@ class TestRocksDbKeyValueStore
options,
config,
false,
- "dbStore")
+ "dbStore",
+ null)
val key = "key".getBytes("UTF-8")
rocksDB.put(key, "val".getBytes("UTF-8"))
// SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
@@ -98,7 +101,8 @@ class TestRocksDbKeyValueStore
options,
config,
false,
- "dbStore")
+ "dbStore",
+ null)
val key = "key".getBytes("UTF-8")
val key1 = "key1".getBytes("UTF-8")
@@ -142,4 +146,32 @@ class TestRocksDbKeyValueStore
rocksDB.close()
rocksDBReadOnly.close()
}
+
+ @Test
+ def testMetricsConfig(): Unit = {
+ val registry = new MetricsRegistryMap("registrymap")
+ val metrics = new KeyValueStoreMetrics("dbstore", registry)
+
+ val map = new util.HashMap[String, String]()
+ map.put("rocksdb.metrics.list", "rocksdb.estimate-num-keys, rocksdb.estimate-live-data-size")
+ val config = new MapConfig(map)
+ val options = new Options()
+ options.setCreateIfMissing(true)
+ val rocksDB = RocksDbKeyValueStore.openDB(
+ new File(System.getProperty("java.io.tmpdir")),
+ options,
+ config,
+ false,
+ "dbstore",
+ metrics)
+
+ val metricsGroup = registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics")
+ assert(metricsGroup != null)
+
+ val estimateNumKeysMetric = metricsGroup.get("dbstore-rocksdb.estimate-num-keys")
+ assert(estimateNumKeysMetric.isInstanceOf[Gauge[String]])
+
+ val estimateLiveDataSizeMetric = metricsGroup.get("dbstore-rocksdb.estimate-live-data-size")
+ assert(estimateLiveDataSizeMetric.isInstanceOf[Gauge[String]])
+ }
}