You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/11/21 18:05:45 UTC

[samza] branch master updated: SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb (#1209)

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new be1b8ac  SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb (#1209)
be1b8ac is described below

commit be1b8ac67d5f42c77d473c1c3d5c33a6d5e9d07e
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Thu Nov 21 10:05:35 2019 -0800

    SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb (#1209)
    
    SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb
---
 .../samza/storage/kv/SerializedKeyValueStore.scala  | 21 +++++++++++++++++++--
 .../storage/kv/SerializedKeyValueStoreMetrics.scala |  1 +
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 5b3456c..51df502 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -59,23 +59,27 @@ class SerializedKeyValueStore[K, V](
   }
 
   def put(key: K, value: V) {
-    metrics.puts.inc
     val keyBytes = toBytesOrNull(key, keySerde)
     val valBytes = toBytesOrNull(value, msgSerde)
     store.put(keyBytes, valBytes)
+    val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+    updatePutMetrics(1, valSizeBytes)
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
     val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
     val iter = entries.iterator
+    var newMaxRecordSizeBytes = 0
     while (iter.hasNext) {
       val curr = iter.next
       val keyBytes = toBytesOrNull(curr.getKey, keySerde)
       val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+      val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+      newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
       list.add(new Entry(keyBytes, valBytes))
     }
     store.putAll(list)
-    metrics.puts.inc(list.size)
+    updatePutMetrics(list.size, newMaxRecordSizeBytes)
   }
 
   def delete(key: K) {
@@ -153,6 +157,19 @@ class SerializedKeyValueStore[K, V](
     bytes
   }
 
+  /**
+   * Updates put metrics with the given batch and record sizes. The max record size metric is updated with a
+   * thread UN-SAFE read-then-write, so accuracy is not guaranteed; if multiple threads overlap in their invocation of
+   * this method, the last to write simply wins regardless of the value it read.
+   */
+  private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
+    metrics.puts.inc(batchSize)
+    val max = metrics.maxRecordSizeBytes.getValue
+    if (newMaxRecordSizeBytes > max) {
+      metrics.maxRecordSizeBytes.set(newMaxRecordSizeBytes)
+    }
+  }
+
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     val fromBytes = toBytesOrNull(from, keySerde)
     val toBytes = toBytesOrNull(to, keySerde)
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 841e4a2..f7dc953 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,6 +35,7 @@ class SerializedKeyValueStoreMetrics(
   val flushes = newCounter("flushes")
   val bytesSerialized = newCounter("bytes-serialized")
   val bytesDeserialized = newCounter("bytes-deserialized")
+  val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file