You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/10/09 21:12:32 UTC
[samza] branch master updated: SAMZA-2324: Adding KV store metrics
for rocksdb (#1158)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a07e386 SAMZA-2324: Adding KV store metrics for rocksdb (#1158)
a07e386 is described below
commit a07e386d1fd0a0ff560a4a22918d27cfe28a6249
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Wed Oct 9 14:12:27 2019 -0700
SAMZA-2324: Adding KV store metrics for rocksdb (#1158)
1) size of store on disk (sst files; property provided by rocksdb)
2) maximum record size
---
.../apache/samza/storage/kv/RocksDbKeyValueStore.scala | 3 ++-
.../samza/storage/kv/SerializedKeyValueStore.scala | 16 ++++++++++++++--
.../storage/kv/SerializedKeyValueStoreMetrics.scala | 1 +
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a2ae8b0..f1ae004 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -83,7 +83,8 @@ object RocksDbKeyValueStore extends Logging {
"rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
"rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
"rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
- "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
+ "rocksdb.estimate-num-keys", // approximate number keys in the active and unflushed memtable and storage
+ "rocksdb.total-sst-files-size" // size of all sst files on disk
)
val configuredMetrics = storeConfig
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 169452c..7136b60 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -57,23 +57,27 @@ class SerializedKeyValueStore[K, V](
}
def put(key: K, value: V) {
- metrics.puts.inc
val keyBytes = toBytesOrNull(key, keySerde)
val valBytes = toBytesOrNull(value, msgSerde)
store.put(keyBytes, valBytes)
+ val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+ updatePutMetrics(1, valSizeBytes)
}
def putAll(entries: java.util.List[Entry[K, V]]) {
val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
val iter = entries.iterator
+ var newMaxRecordSizeBytes = 0
while (iter.hasNext) {
val curr = iter.next
val keyBytes = toBytesOrNull(curr.getKey, keySerde)
val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+ val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+ newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
list.add(new Entry(keyBytes, valBytes))
}
store.putAll(list)
- metrics.puts.inc(list.size)
+ updatePutMetrics(list.size, newMaxRecordSizeBytes)
}
def delete(key: K) {
@@ -151,6 +155,14 @@ class SerializedKeyValueStore[K, V](
bytes
}
+ private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
+ metrics.puts.inc(batchSize)
+ var max = metrics.maxRecordSizeBytes.getValue
+ while (newMaxRecordSizeBytes > max && !metrics.maxRecordSizeBytes.compareAndSet(max, newMaxRecordSizeBytes)) {
+ max = metrics.maxRecordSizeBytes.getValue
+ }
+ }
+
override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
val fromBytes = toBytesOrNull(from, keySerde)
val toBytes = toBytesOrNull(to, keySerde)
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 841e4a2..f7dc953 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,6 +35,7 @@ class SerializedKeyValueStoreMetrics(
val flushes = newCounter("flushes")
val bytesSerialized = newCounter("bytes-serialized")
val bytesDeserialized = newCounter("bytes-deserialized")
+ val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
override def getPrefix = storeName + "-"
}
\ No newline at end of file