You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2017/10/16 23:06:33 UTC

samza git commit: SAMZA-1461; Expose RocksDB properties as metrics

Repository: samza
Updated Branches:
  refs/heads/master c93dd8f60 -> 343712e30


SAMZA-1461; Expose RocksDB properties as metrics

Automatically build gauges for RocksDB properties via configuration:
`stores.<storename>.rocksdb.telemetry.list=<rocksDbProperty1>, <rocksDbProperty1>`

Author: Janek Lasocki-Biczysko <ja...@skyscanner.net>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #327 from janeklb/jlb_rocksDBPropertiesAsMetrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/343712e3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/343712e3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/343712e3

Branch: refs/heads/master
Commit: 343712e30c0a7201d6bb47f340187af51f395671
Parents: c93dd8f
Author: Janek Lasocki-Biczysko <ja...@skyscanner.net>
Authored: Mon Oct 16 16:06:28 2017 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Mon Oct 16 16:06:28 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  8 +++++
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 32 ++++++++++++-----
 .../storage/kv/TestRocksDbKeyValueStore.scala   | 38 ++++++++++++++++++--
 3 files changed, 66 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index fc1e86d..fb8a97e 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1781,6 +1781,14 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        A list of RocksDB <a href="https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409">properties</a> to expose as metrics (gauges).
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="cluster-manager">
                         Running Samza with a cluster manager<br>
                     </th>

http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index c771788..023e4a8 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -28,7 +28,7 @@ import org.rocksdb.TtlDB
 
 object RocksDbKeyValueStore extends Logging {
 
-  def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String): RocksDB = {
+  def openDB(dir: File, options: Options, storeConfig: Config, isLoggedStore: Boolean, storeName: String, metrics: KeyValueStoreMetrics): RocksDB = {
     var ttl = 0L
     var useTTL = false
 
@@ -68,15 +68,29 @@ object RocksDbKeyValueStore extends Logging {
 
     try
     {
-      if (useTTL)
-      {
-        info("Opening RocksDB store with TTL value: %s" format ttl)
-        TtlDB.open(options, dir.toString, ttl.toInt, false)
-      }
-      else
+      val rocksDb =
+        if (useTTL)
+        {
+          info("Opening RocksDB store with TTL value: %s" format ttl)
+          TtlDB.open(options, dir.toString, ttl.toInt, false)
+        }
+        else
+        {
+          RocksDB.open(options, dir.toString)
+        }
+
+      if (storeConfig.containsKey("rocksdb.metrics.list"))
       {
-        RocksDB.open(options, dir.toString)
+        storeConfig
+          .get("rocksdb.metrics.list")
+          .split(",")
+          .map(property => property.trim)
+          .foreach(property =>
+            metrics.newGauge(property, () => rocksDb.getProperty(property))
+          )
       }
+
+      rocksDb
     }
     catch
       {
@@ -104,7 +118,7 @@ class RocksDbKeyValueStore(
 
   // lazy val here is important because the store directories do not exist yet, it can only be opened
   // after the directories are created, which happens much later from now.
-  private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName)
+  private lazy val db = RocksDbKeyValueStore.openDB(dir, options, storeConfig, isLoggedStore, storeName, metrics)
   private val lexicographic = new LexicographicComparator()
 
   def get(key: Array[Byte]): Array[Byte] = {

http://git-wip-us.apache.org/repos/asf/samza/blob/343712e3/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 05d39ea..6f129be 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -24,6 +24,7 @@ import java.io.File
 import java.util
 
 import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.junit.{Assert, Test}
 import org.rocksdb.{RocksIterator, RocksDB, FlushOptions, Options}
@@ -41,7 +42,8 @@ class TestRocksDbKeyValueStore
                                               options,
                                               config,
                                               false,
-                                              "someStore")
+                                              "someStore",
+                                              null)
     val key = "test".getBytes("UTF-8")
     rocksDB.put(key, "val".getBytes("UTF-8"))
     Assert.assertNotNull(rocksDB.get(key))
@@ -72,7 +74,8 @@ class TestRocksDbKeyValueStore
                                               options,
                                               config,
                                               false,
-                                              "dbStore")
+                                              "dbStore",
+                                              null)
     val key = "key".getBytes("UTF-8")
     rocksDB.put(key, "val".getBytes("UTF-8"))
     // SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
@@ -98,7 +101,8 @@ class TestRocksDbKeyValueStore
                                               options,
                                               config,
                                               false,
-                                              "dbStore")
+                                              "dbStore",
+                                              null)
 
     val key = "key".getBytes("UTF-8")
     val key1 = "key1".getBytes("UTF-8")
@@ -142,4 +146,32 @@ class TestRocksDbKeyValueStore
     rocksDB.close()
     rocksDBReadOnly.close()
   }
+
+  @Test
+  def testMetricsConfig(): Unit = {
+    val registry = new MetricsRegistryMap("registrymap")
+    val metrics = new KeyValueStoreMetrics("dbstore", registry)
+
+    val map = new util.HashMap[String, String]()
+    map.put("rocksdb.metrics.list", "rocksdb.estimate-num-keys, rocksdb.estimate-live-data-size")
+    val config = new MapConfig(map)
+    val options = new Options()
+    options.setCreateIfMissing(true)
+    val rocksDB = RocksDbKeyValueStore.openDB(
+      new File(System.getProperty("java.io.tmpdir")),
+      options,
+      config,
+      false,
+      "dbstore",
+      metrics)
+
+    val metricsGroup = registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics")
+    assert(metricsGroup != null)
+
+    val estimateNumKeysMetric = metricsGroup.get("dbstore-rocksdb.estimate-num-keys")
+    assert(estimateNumKeysMetric.isInstanceOf[Gauge[String]])
+
+    val estimateLiveDataSizeMetric = metricsGroup.get("dbstore-rocksdb.estimate-live-data-size")
+    assert(estimateLiveDataSizeMetric.isInstanceOf[Gauge[String]])
+  }
 }