You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/03/02 20:15:08 UTC
[kudu] branch master updated: [scripts] Update parse_metrics_log.py
to make it usable in other scripts
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 386b04a [scripts] Update parse_metrics_log.py to make it usable in other scripts
386b04a is described below
commit 386b04a8b0c8a3b9ff2db8b13bf7bedd8800d14f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Mar 2 12:33:51 2020 -0600
[scripts] Update parse_metrics_log.py to make it usable in other scripts
This wraps the functionality in parse_metrics_log.py in a class
so that it can be used from another python script.
It maintains the old functionality by moving the print statement
to the main function, from the process function.
I will use this so I can wrap this script in some of my performance
analysis work.
Change-Id: Ia107b54a7efc45f9877e7c0b9405cefb550c38f2
Reviewed-on: http://gerrit.cloudera.org:8080/15336
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins
---
src/kudu/scripts/parse_metrics_log.py | 232 ++++++++++++++++++----------------
1 file changed, 121 insertions(+), 111 deletions(-)
diff --git a/src/kudu/scripts/parse_metrics_log.py b/src/kudu/scripts/parse_metrics_log.py
index af8c8c8..a0bd562 100644
--- a/src/kudu/scripts/parse_metrics_log.py
+++ b/src/kudu/scripts/parse_metrics_log.py
@@ -23,7 +23,6 @@ and outputs a TSV file including some metrics.
This isn't meant to be used standalone as written, but rather as a template
which is edited based on whatever metrics you'd like to extract. The set
of metrics described below are just a starting point to work from.
-Uncomment the ones you are interested in, or add new ones.
"""
from collections import defaultdict
@@ -43,49 +42,43 @@ GRANULARITY_SECS = 30
# These metrics will be extracted "as-is" into the TSV.
# The first element of each tuple is the metric name.
# The second is the name that will be used in the TSV header line.
-SIMPLE_METRICS = [
- # ("server.generic_current_allocated_bytes", "heap_allocated"),
- # ("server.log_block_manager_bytes_under_management", "bytes_on_disk"),
- # ("tablet.memrowset_size", "mrs_size"),
- # ("server.block_cache_usage", "bc_usage"),
+DEFAULT_SIMPLE_METRICS = [
+ ("server.generic_current_allocated_bytes", "heap_allocated"),
+ ("server.log_block_manager_bytes_under_management", "bytes_on_disk"),
+ ("tablet.memrowset_size", "mrs_size"),
+ ("server.block_cache_usage", "bc_usage"),
]
# These metrics will be extracted as per-second rates into the TSV.
-RATE_METRICS = [
- # ("server.block_manager_total_bytes_read", "bytes_r_per_sec"),
- # ("server.block_manager_total_bytes_written", "bytes_w_per_sec"),
- # ("server.block_cache_lookups", "bc_lookups_per_sec"),
- # ("server.cpu_utime", "cpu_utime"),
- # ("server.cpu_stime", "cpu_stime"),
- # ("server.involuntary_context_switches", "invol_cs"),
- # ("server.voluntary_context_switches", "vol_cs"),
- # ("tablet.rows_inserted", "inserts_per_sec"),
- # ("tablet.rows_upserted", "upserts_per_sec"),
+DEFAULT_RATE_METRICS = [
+ ("server.block_manager_total_bytes_read", "bytes_r_per_sec"),
+ ("server.block_manager_total_bytes_written", "bytes_w_per_sec"),
+ ("server.block_cache_lookups", "bc_lookups_per_sec"),
+ ("server.cpu_utime", "cpu_utime"),
+ ("server.cpu_stime", "cpu_stime"),
+ ("server.involuntary_context_switches", "invol_cs"),
+ ("server.voluntary_context_switches", "vol_cs"),
+ ("tablet.rows_inserted", "inserts_per_sec"),
+ ("tablet.rows_upserted", "upserts_per_sec"),
+ ("tablet.leader_memory_pressure_rejections", "mem_rejections"),
]
# These metrics will be extracted as percentile metrics into the TSV.
# Each metric will generate several columns in the output TSV, with
# percentile numbers suffixed to the column name provided here (foo_p95,
# foo_p99, etc)
-HISTOGRAM_METRICS = [
- # ("server.op_apply_run_time", "apply_run_time"),
- # ("server.handler_latency_kudu_tserver_TabletServerService_Write", "write"),
- # ("server.handler_latency_kudu_consensus_ConsensusService_UpdateConsensus", "cons_update"),
- # ("server.handler_latency_kudu_consensus_ConsensusService_RequestVote", "vote"),
- # ("server.handler_latency_kudu_tserver_TabletCopyService_FetchData", "fetch_data"),
- # ("tablet.bloom_lookups_per_op", "bloom_lookups"),
- # ("tablet.log_append_latency", "log"),
- # ("tablet.op_prepare_run_time", "prep"),
- # ("tablet.write_op_duration_client_propagated_consistency", "op_dur")
+DEFAULT_HISTOGRAM_METRICS = [
+ ("server.op_apply_run_time", "apply_run_time"),
+ ("server.handler_latency_kudu_tserver_TabletServerService_Write", "write"),
+ ("server.handler_latency_kudu_consensus_ConsensusService_UpdateConsensus", "cons_update"),
+ ("server.handler_latency_kudu_consensus_ConsensusService_RequestVote", "vote"),
+ ("server.handler_latency_kudu_tserver_TabletCopyService_FetchData", "fetch_data"),
+ ("tablet.bloom_lookups_per_op", "bloom_lookups"),
+ ("tablet.log_append_latency", "log"),
+ ("tablet.op_prepare_run_time", "prep"),
+ ("tablet.write_op_duration_client_propagated_consistency", "op_dur")
]
-# Get the set of metrics we actuall want to bother parsing from the log.
-PARSE_METRIC_KEYS = set(key for (key, _) in (SIMPLE_METRICS + RATE_METRICS + HISTOGRAM_METRICS))
-
-# The script always reports cache-hit metrics.
-PARSE_METRIC_KEYS.add("server.block_cache_hits_caching")
-PARSE_METRIC_KEYS.add("server.block_cache_misses_caching")
-
NaN = float('nan')
UNKNOWN_PERCENTILES = dict(p50=0, p95=0, p99=0, p999=0, max=0)
@@ -103,7 +96,7 @@ def strip_metric(m):
return NaN
-def json_to_map(j):
+def json_to_map(j, parse_metric_keys):
"""
Parse the JSON structure in the log into a python dictionary of the form:
{ <entity>.<metric name>:String => { <entity id>:String => <values> } }
@@ -115,7 +108,7 @@ def json_to_map(j):
for m in entity['metrics']:
entity_id = entity['id']
metric_key = entity['type'] + "." + m['name']
- if metric_key not in PARSE_METRIC_KEYS:
+ if metric_key not in parse_metric_keys:
continue
# Add the metric_id to the metrics map.
ret[metric_key][entity_id] = strip_metric(m)
@@ -147,7 +140,7 @@ def aggregate_metrics(metric_to_eid_to_vals):
ret[metric_name] = vals.copy()
else:
# Otherwise, add the counts to what's there.
- for val, count in vals.iteritems():
+ for val, count in vals.items():
if val in ret[metric_name]:
ret[metric_name][val] += count
else:
@@ -173,8 +166,8 @@ def histogram_stats(aggregated_prev, aggregated_cur, m):
# Determine the total count we should expect between the current and previous
# snapshots.
- delta_total = sum([val for _, val in cur.iteritems()]) - \
- sum([val for _, val in prev.iteritems()])
+ delta_total = sum([val for _, val in cur.items()]) - \
+ sum([val for _, val in prev.items()])
if delta_total == 0:
return UNKNOWN_PERCENTILES
@@ -184,11 +177,10 @@ def histogram_stats(aggregated_prev, aggregated_cur, m):
# Iterate over all of the buckets for the current and previous snapshots,
# summing them up, and assigning percentiles to the bucket as appropriate.
- for cur_val, cur_count in sorted(aggregated_cur[m].iteritems()):
+ for cur_val, cur_count in sorted(aggregated_cur[m].items()):
prev_count = prev.get(cur_val, 0)
delta_count = cur_count - prev_count
cum_count += delta_count
- percentile = float(cum_count) / delta_total
# Determine which percentiles this bucket belongs to.
percentile = float(cum_count) / delta_total
@@ -217,94 +209,112 @@ def cache_hit_ratio(aggregated_prev, aggregated_cur):
cache_ratio = NaN
return cache_ratio
-def process(aggregated_prev, aggregated_cur):
+def process(aggregated_prev, aggregated_cur, simple_metrics, rate_metrics, histogram_metrics):
""" Process a pair of metric snapshots, outputting a line of TSV. """
if not aggregated_prev:
- aggregated_prev = aggregate_metrics(prev)
+ aggregated_prev = aggregate_metrics(aggregated_prev)
delta_ts = aggregated_cur['ts'] - aggregated_prev['ts']
calc_vals = []
cache_ratio = cache_hit_ratio(aggregated_prev, aggregated_cur)
- for metric, _ in SIMPLE_METRICS:
+ for metric, _ in simple_metrics:
if metric in aggregated_cur:
calc_vals.append(aggregated_cur[metric])
else:
calc_vals.append(0)
calc_vals.extend((delta(aggregated_prev, aggregated_cur, metric))/delta_ts \
- for metric, _ in RATE_METRICS)
- for metric, _ in HISTOGRAM_METRICS:
+ for metric, _ in rate_metrics)
+ for metric, _ in histogram_metrics:
stats = histogram_stats(aggregated_prev, aggregated_cur, metric)
calc_vals.extend([stats['p50'], stats['p95'], stats['p99'], stats['p999'], stats['max']])
- print((aggregated_cur['ts'] + aggregated_prev['ts'])/2,
- cache_ratio,
- " ".join(str(x) for x in calc_vals))
- return aggregated_cur
+ return tuple([(aggregated_cur['ts'] + aggregated_prev['ts'])/2, cache_ratio] + calc_vals)
+
+class MetricsLogParser(object):
+ def __init__(self, paths,
+ simple_metrics=DEFAULT_SIMPLE_METRICS,
+ rate_metrics=DEFAULT_RATE_METRICS,
+ histogram_metrics=DEFAULT_HISTOGRAM_METRICS):
+ self.paths = paths
+ self.simple_metrics = simple_metrics
+ self.rate_metrics = rate_metrics
+ self.histogram_metrics = histogram_metrics
+ # Get the set of metrics we actually want to bother parsing from the log.
+ self.parse_metric_keys = set(key for (key, _) in (simple_metrics + rate_metrics + histogram_metrics))
+ # The script always reports cache-hit metrics.
+ self.parse_metric_keys.add("server.block_cache_hits_caching")
+ self.parse_metric_keys.add("server.block_cache_misses_caching")
+
+ def column_names(self):
+ simple_headers = [header for _, header in self.simple_metrics + self.rate_metrics]
+ for _, header in self.histogram_metrics:
+ simple_headers.append(header + "_p50")
+ simple_headers.append(header + "_p95")
+ simple_headers.append(header + "_p99")
+ simple_headers.append(header + "_p999")
+ simple_headers.append(header + "_max")
+ return tuple(["time", "cache_hit_ratio"] + simple_headers)
+
+ def __iter__(self):
+ prev_data = None
+ aggregated_prev = None
+
+ for path in sorted(self.paths):
+ if path.endswith(".gz"):
+ f = gzip.GzipFile(path)
+ else:
+ f = open(path)
+ for line_number, line in enumerate(f, start=1):
+ # Only parse out the "metrics" lines.
+ try:
+ (_, _, log_type, ts, metrics_json) = line.split(" ")
+ except ValueError:
+ continue
+ if log_type != "metrics":
+ continue
+ ts = float(ts) / 1000000.0
+ prev_ts = prev_data['ts'] if prev_data else 0
+ # Enforce that the samples come in time-sorted order.
+ if ts <= prev_ts:
+ raise Exception("timestamps must be in ascending order (%f <= %f at %s:%d)"
+ % (ts, prev_ts, path, line_number))
+ if prev_data and ts < prev_ts + GRANULARITY_SECS:
+ continue
+
+ # Parse the metrics json into a map of the form:
+ # { metric key => { entity id => metric value } }
+ data = json_to_map(json.loads(metrics_json), self.parse_metric_keys)
+ data['ts'] = ts
+ if prev_data:
+ # Copy missing metrics from prev_data.
+ for m, prev_eid_to_vals in prev_data.items():
+ if m is 'ts':
+ continue
+ # The metric was missing entirely; copy it over.
+ if m not in data:
+ data[m] = prev_eid_to_vals
+ else:
+ # If the metric was missing for a specific entity, copy the metric
+ # from the previous snapshot.
+ for eid, prev_vals in prev_eid_to_vals.items():
+ if eid not in data[m]:
+ data[m][eid] = prev_vals
+
+ aggregated_cur = aggregate_metrics(data)
+ if prev_data:
+ if not aggregated_prev:
+ aggregated_prev = aggregate_metrics(prev_data)
+ yield process(aggregated_prev, aggregated_cur,
+ self.simple_metrics, self.rate_metrics, self.histogram_metrics)
+
+ prev_data = data
+ aggregated_prev = aggregated_cur
def main(argv):
- prev_data = None
- aggregated_prev = None
-
- simple_headers = [header for _, header in SIMPLE_METRICS + RATE_METRICS]
- for _, header in HISTOGRAM_METRICS:
- simple_headers.append(header + "_p50")
- simple_headers.append(header + "_p95")
- simple_headers.append(header + "_p99")
- simple_headers.append(header + "_p999")
- simple_headers.append(header + "_max")
-
- print("time cache_hit_ratio", " ".join(simple_headers))
-
- for path in sorted(argv[1:]):
- if path.endswith(".gz"):
- f = gzip.GzipFile(path)
- else:
- f = open(path)
- for line_number, line in enumerate(f, start=1):
- # Only parse out the "metrics" lines.
- try:
- (_, _, log_type, ts, metrics_json) = line.split(" ")
- except ValueError:
- continue
- if log_type != "metrics":
- continue
- ts = float(ts) / 1000000.0
- prev_ts = prev_data['ts'] if prev_data else 0
- # Enforce that the samples come in time-sorted order.
- if ts <= prev_ts:
- raise Exception("timestamps must be in ascending order (%f <= %f at %s:%d)"
- % (ts, prev_ts, path, line_number))
- if prev_data and ts < prev_ts + GRANULARITY_SECS:
- continue
-
- # Parse the metrics json into a map of the form:
- # { metric key => { entity id => metric value } }
- data = json_to_map(json.loads(metrics_json))
- data['ts'] = ts
- if prev_data:
- # Copy missing metrics from prev_data.
- for m, prev_eid_to_vals in prev_data.iteritems():
- if m is 'ts':
- continue
- # The metric was missing entirely; copy it over.
- if m not in data:
- data[m] = prev_eid_to_vals
- else:
- # If the metric was missing for a specific entity, copy the metric
- # from the previous snapshot.
- for eid, prev_vals in prev_eid_to_vals.iteritems():
- if eid not in data[m]:
- data[m][eid] = prev_vals
-
- aggregated_cur = aggregate_metrics(data)
- if prev_data:
- if not aggregated_prev:
- aggregated_prev = aggregate_metrics(prev_data)
- process(aggregated_prev, aggregated_cur)
-
- prev_data = data
- aggregated_prev = aggregated_cur
+ parser = MetricsLogParser(argv[1:], DEFAULT_SIMPLE_METRICS, DEFAULT_RATE_METRICS, DEFAULT_HISTOGRAM_METRICS)
+ for line in parser:
+ print(line)
if __name__ == "__main__":
main(sys.argv)