You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/10/09 21:12:32 UTC

[samza] branch master updated: SAMZA-2324: Adding KV store metrics for rocksdb (#1158)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a07e386  SAMZA-2324: Adding KV store metrics for rocksdb (#1158)
a07e386 is described below

commit a07e386d1fd0a0ff560a4a22918d27cfe28a6249
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Wed Oct 9 14:12:27 2019 -0700

    SAMZA-2324: Adding KV store metrics for rocksdb (#1158)
    
    1) size of store on disk (sst files; property provided by rocksdb)
    2) maximum record size
---
 .../apache/samza/storage/kv/RocksDbKeyValueStore.scala   |  3 ++-
 .../samza/storage/kv/SerializedKeyValueStore.scala       | 16 ++++++++++++++--
 .../storage/kv/SerializedKeyValueStoreMetrics.scala      |  1 +
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a2ae8b0..f1ae004 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -83,7 +83,8 @@ object RocksDbKeyValueStore extends Logging {
         "rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes
         "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes
         "rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes
-        "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage
+        "rocksdb.estimate-num-keys", // approximate number keys in the active and unflushed memtable and storage
+        "rocksdb.total-sst-files-size" // size of all sst files on disk
       )
 
       val configuredMetrics = storeConfig
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 169452c..7136b60 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -57,23 +57,27 @@ class SerializedKeyValueStore[K, V](
   }
 
   def put(key: K, value: V) {
-    metrics.puts.inc
     val keyBytes = toBytesOrNull(key, keySerde)
     val valBytes = toBytesOrNull(value, msgSerde)
     store.put(keyBytes, valBytes)
+    val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+    updatePutMetrics(1, valSizeBytes)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
     val iter = entries.iterator
+    var newMaxRecordSizeBytes = 0
     while (iter.hasNext) {
       val curr = iter.next
       val keyBytes = toBytesOrNull(curr.getKey, keySerde)
       val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+      val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+      newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)
-    metrics.puts.inc(list.size)
+    updatePutMetrics(list.size, newMaxRecordSizeBytes)
   }
 
   def delete(key: K) {
@@ -151,6 +155,14 @@ class SerializedKeyValueStore[K, V](
     bytes
   }
 
+  private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
+    metrics.puts.inc(batchSize)
+    var max = metrics.maxRecordSizeBytes.getValue
+    while (newMaxRecordSizeBytes > max && !metrics.maxRecordSizeBytes.compareAndSet(max, newMaxRecordSizeBytes)) {
+      max = metrics.maxRecordSizeBytes.getValue
+    }
+  }
+
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     val fromBytes = toBytesOrNull(from, keySerde)
     val toBytes = toBytesOrNull(to, keySerde)
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 841e4a2..f7dc953 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,6 +35,7 @@ class SerializedKeyValueStoreMetrics(
   val flushes = newCounter("flushes")
   val bytesSerialized = newCounter("bytes-serialized")
   val bytesDeserialized = newCounter("bytes-deserialized")
+  val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file