You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/17 17:38:35 UTC

samza git commit: SAMZA-1500: Added metrics for RocksDB state store memory usage

Repository: samza
Updated Branches:
  refs/heads/master 267dfc6ba -> 3a9e80642


SAMZA-1500: Added metrics for RocksDB state store memory usage

Approximate RocksDB memory usage = Configured Block Cache size + MemTable size + Indexes and Bloom Filters size =
rocksdb.block-cache-size + rocksdb.size-all-mem-tables + rocksdb.estimate-table-readers-mem

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #404 from prateekm/rocksdb-memory


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a9e8064
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a9e8064
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a9e8064

Branch: refs/heads/master
Commit: 3a9e8064211a79827a2a4793c1f159a689dfa256
Parents: 267dfc6
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Jan 17 09:39:26 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 09:39:26 2018 -0800

----------------------------------------------------------------------
 .../samza/storage/kv/RocksDbOptionsHelper.java  | 12 ++++++----
 .../RocksDbKeyValueStorageEngineFactory.scala   |  3 +++
 .../samza/storage/kv/RocksDbKeyValueStore.scala | 25 ++++++++++++++------
 .../storage/kv/TestRocksDbKeyValueStore.scala   |  6 ++---
 4 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 9b8f44b..9389681 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -75,12 +75,10 @@ public class RocksDbOptionsHelper {
     }
     options.setCompressionType(compressionType);
 
-    Long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
-    Long cacheSizePerContainer = cacheSize / numTasks;
-
+    long blockCacheSize = getBlockCacheSize(storeConfig, containerContext);
     int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096);
     BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
-    tableOptions.setBlockCacheSize(cacheSizePerContainer).setBlockSize(blockSize);
+    tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize);
     options.setTableFormatConfig(tableOptions);
 
     CompactionStyle compactionStyle = CompactionStyle.UNIVERSAL;
@@ -110,4 +108,10 @@ public class RocksDbOptionsHelper {
 
     return options;
   }
+
+  public static Long getBlockCacheSize(Config storeConfig, SamzaContainerContext containerContext) {
+    int numTasks = containerContext.taskNames.size();
+    long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
+    return cacheSize / numTasks;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index a7b748f..2b7ffb5 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -44,6 +44,9 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
     val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
     val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined
     val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+    rocksDbMetrics.newGauge("rocksdb.block-cache-size",
+      () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, containerContext))
+
     val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
     val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
     val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)

http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 6aad45f..eae7da2 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -72,13 +72,24 @@ object RocksDbKeyValueStore extends Logging {
           RocksDB.open(options, dir.toString)
         }
 
-      if (storeConfig.containsKey("rocksdb.metrics.list")) {
-        storeConfig
-          .get("rocksdb.metrics.list")
-          .split(",")
-          .map(property => property.trim)
-          .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
-      }
+      // See https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h for available properties
+      val rocksDbMetrics = Set (
+        "rocksdb.estimate-table-readers-mem", // indexes and bloom filters
+        "rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
+        "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
+        "rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
+        "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
+      )
+
+      val configuredMetrics = storeConfig
+        .get("rocksdb.metrics.list", "")
+        .split(",")
+        .map(property => property.trim)
+        .filter(!_.isEmpty)
+        .toSet
+
+      (configuredMetrics ++ rocksDbMetrics)
+        .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
 
       rocksDb
     } catch {

http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 418e986..ca9c023 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -44,7 +44,7 @@ class TestRocksDbKeyValueStore
                                               config,
                                               false,
                                               "someStore",
-                                              null)
+                                              new KeyValueStoreMetrics())
     val key = "test".getBytes("UTF-8")
     rocksDB.put(key, "val".getBytes("UTF-8"))
     Assert.assertNotNull(rocksDB.get(key))
@@ -76,7 +76,7 @@ class TestRocksDbKeyValueStore
                                               config,
                                               false,
                                               "dbStore",
-                                              null)
+                                              new KeyValueStoreMetrics())
     val key = "key".getBytes("UTF-8")
     rocksDB.put(key, "val".getBytes("UTF-8"))
     // SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
@@ -136,7 +136,7 @@ class TestRocksDbKeyValueStore
                                               config,
                                               false,
                                               "dbStore",
-                                              null)
+                                              new KeyValueStoreMetrics())
 
     val key = "key".getBytes("UTF-8")
     val key1 = "key1".getBytes("UTF-8")