You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/01/17 17:38:35 UTC
samza git commit: SAMZA-1500: Added metrics for RocksDB state store
memory usage
Repository: samza
Updated Branches:
refs/heads/master 267dfc6ba -> 3a9e80642
SAMZA-1500: Added metrics for RocksDB state store memory usage
Approximate RocksDB memory usage = Configured Block Cache size + MemTable size + Indexes and Bloom Filters size =
rocksdb.block-cache-size + rocksdb.size-all-mem-tables + rocksdb.estimate-table-readers-mem
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #404 from prateekm/rocksdb-memory
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a9e8064
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a9e8064
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a9e8064
Branch: refs/heads/master
Commit: 3a9e8064211a79827a2a4793c1f159a689dfa256
Parents: 267dfc6
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Jan 17 09:39:26 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Wed Jan 17 09:39:26 2018 -0800
----------------------------------------------------------------------
.../samza/storage/kv/RocksDbOptionsHelper.java | 12 ++++++----
.../RocksDbKeyValueStorageEngineFactory.scala | 3 +++
.../samza/storage/kv/RocksDbKeyValueStore.scala | 25 ++++++++++++++------
.../storage/kv/TestRocksDbKeyValueStore.scala | 6 ++---
4 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index 9b8f44b..9389681 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -75,12 +75,10 @@ public class RocksDbOptionsHelper {
}
options.setCompressionType(compressionType);
- Long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
- Long cacheSizePerContainer = cacheSize / numTasks;
-
+ long blockCacheSize = getBlockCacheSize(storeConfig, containerContext);
int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096);
BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
- tableOptions.setBlockCacheSize(cacheSizePerContainer).setBlockSize(blockSize);
+ tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize);
options.setTableFormatConfig(tableOptions);
CompactionStyle compactionStyle = CompactionStyle.UNIVERSAL;
@@ -110,4 +108,10 @@ public class RocksDbOptionsHelper {
return options;
}
+
+ public static Long getBlockCacheSize(Config storeConfig, SamzaContainerContext containerContext) {
+ int numTasks = containerContext.taskNames.size();
+ long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L);
+ return cacheSize / numTasks;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
index a7b748f..2b7ffb5 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala
@@ -44,6 +44,9 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi
val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined
val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry)
+ rocksDbMetrics.newGauge("rocksdb.block-cache-size",
+ () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, containerContext))
+
val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext)
val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true)
val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true)
http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 6aad45f..eae7da2 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -72,13 +72,24 @@ object RocksDbKeyValueStore extends Logging {
RocksDB.open(options, dir.toString)
}
- if (storeConfig.containsKey("rocksdb.metrics.list")) {
- storeConfig
- .get("rocksdb.metrics.list")
- .split(",")
- .map(property => property.trim)
- .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
- }
+ // See https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h for available properties
+ val rocksDbMetrics = Set (
+ "rocksdb.estimate-table-readers-mem", // indexes and bloom filters
+ "rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
+ "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
+ "rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
+ "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
+ )
+
+ val configuredMetrics = storeConfig
+ .get("rocksdb.metrics.list", "")
+ .split(",")
+ .map(property => property.trim)
+ .filter(!_.isEmpty)
+ .toSet
+
+ (configuredMetrics ++ rocksDbMetrics)
+ .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
rocksDb
} catch {
http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index 418e986..ca9c023 100644
--- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -44,7 +44,7 @@ class TestRocksDbKeyValueStore
config,
false,
"someStore",
- null)
+ new KeyValueStoreMetrics())
val key = "test".getBytes("UTF-8")
rocksDB.put(key, "val".getBytes("UTF-8"))
Assert.assertNotNull(rocksDB.get(key))
@@ -76,7 +76,7 @@ class TestRocksDbKeyValueStore
config,
false,
"dbStore",
- null)
+ new KeyValueStoreMetrics())
val key = "key".getBytes("UTF-8")
rocksDB.put(key, "val".getBytes("UTF-8"))
// SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1!
@@ -136,7 +136,7 @@ class TestRocksDbKeyValueStore
config,
false,
"dbStore",
- null)
+ new KeyValueStoreMetrics())
val key = "key".getBytes("UTF-8")
val key1 = "key1".getBytes("UTF-8")