You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/11/21 18:05:45 UTC
[samza] branch master updated: SAMZA-2324: Adding maximum
(serialized) record size metrics for rocksdb (#1209)
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new be1b8ac SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb (#1209)
be1b8ac is described below
commit be1b8ac67d5f42c77d473c1c3d5c33a6d5e9d07e
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Thu Nov 21 10:05:35 2019 -0800
SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb (#1209)
SAMZA-2324: Adding maximum (serialized) record size metrics for rocksdb
---
.../samza/storage/kv/SerializedKeyValueStore.scala | 21 +++++++++++++++++++--
.../storage/kv/SerializedKeyValueStoreMetrics.scala | 1 +
2 files changed, 20 insertions(+), 2 deletions(-)
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 5b3456c..51df502 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -59,23 +59,27 @@ class SerializedKeyValueStore[K, V](
}
def put(key: K, value: V) {
- metrics.puts.inc
val keyBytes = toBytesOrNull(key, keySerde)
val valBytes = toBytesOrNull(value, msgSerde)
store.put(keyBytes, valBytes)
+ val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+ updatePutMetrics(1, valSizeBytes)
}
def putAll(entries: java.util.List[Entry[K, V]]) {
val list = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](entries.size())
val iter = entries.iterator
+ var newMaxRecordSizeBytes = 0
while (iter.hasNext) {
val curr = iter.next
val keyBytes = toBytesOrNull(curr.getKey, keySerde)
val valBytes = toBytesOrNull(curr.getValue, msgSerde)
+ val valSizeBytes = if (valBytes == null) 0 else valBytes.length
+ newMaxRecordSizeBytes = Math.max(newMaxRecordSizeBytes, valSizeBytes)
list.add(new Entry(keyBytes, valBytes))
}
store.putAll(list)
- metrics.puts.inc(list.size)
+ updatePutMetrics(list.size, newMaxRecordSizeBytes)
}
def delete(key: K) {
@@ -153,6 +157,19 @@ class SerializedKeyValueStore[K, V](
bytes
}
+ /**
+ * Updates put metrics with the given batch and record sizes. The max record size metric is updated with a
+ * thread UN-SAFE read-then-write, so accuracy is not guaranteed; if multiple threads overlap in their invocation of
+ * this method, the last to write simply wins regardless of the value it read.
+ */
+ private def updatePutMetrics(batchSize: Long, newMaxRecordSizeBytes: Long) = {
+ metrics.puts.inc(batchSize)
+ val max = metrics.maxRecordSizeBytes.getValue
+ if (newMaxRecordSizeBytes > max) {
+ metrics.maxRecordSizeBytes.set(newMaxRecordSizeBytes)
+ }
+ }
+
override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
val fromBytes = toBytesOrNull(from, keySerde)
val toBytes = toBytesOrNull(to, keySerde)
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
index 841e4a2..f7dc953 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStoreMetrics.scala
@@ -35,6 +35,7 @@ class SerializedKeyValueStoreMetrics(
val flushes = newCounter("flushes")
val bytesSerialized = newCounter("bytes-serialized")
val bytesDeserialized = newCounter("bytes-deserialized")
+ val maxRecordSizeBytes = newGauge("max-record-size-bytes", 0L)
override def getPrefix = storeName + "-"
}
\ No newline at end of file