You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2020/03/02 20:15:08 UTC

[kudu] branch master updated: [scripts] Update parse_metrics_log.py to make it usable in other scripts

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 386b04a  [scripts] Update parse_metrics_log.py to make it usable in other scripts
386b04a is described below

commit 386b04a8b0c8a3b9ff2db8b13bf7bedd8800d14f
Author: Grant Henke <gr...@apache.org>
AuthorDate: Mon Mar 2 12:33:51 2020 -0600

    [scripts] Update parse_metrics_log.py to make it usable in other scripts
    
    This wraps the functionality in parse_metrics_log.py in a class
    so that it can be used from another python script.
    
    It maintains the old functionality by moving the print statement
    to the main function, from the process function.
    
    I will use this so I can wrap this script in some of my performance
    analysis work.
    
    Change-Id: Ia107b54a7efc45f9877e7c0b9405cefb550c38f2
    Reviewed-on: http://gerrit.cloudera.org:8080/15336
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/scripts/parse_metrics_log.py | 232 ++++++++++++++++++----------------
 1 file changed, 121 insertions(+), 111 deletions(-)

diff --git a/src/kudu/scripts/parse_metrics_log.py b/src/kudu/scripts/parse_metrics_log.py
index af8c8c8..a0bd562 100644
--- a/src/kudu/scripts/parse_metrics_log.py
+++ b/src/kudu/scripts/parse_metrics_log.py
@@ -23,7 +23,6 @@ and outputs a TSV file including some metrics.
 This isn't meant to be used standalone as written, but rather as a template
 which is edited based on whatever metrics you'd like to extract. The set
 of metrics described below are just a starting point to work from.
-Uncomment the ones you are interested in, or add new ones.
 """
 
 from collections import defaultdict
@@ -43,49 +42,43 @@ GRANULARITY_SECS = 30
 # These metrics will be extracted "as-is" into the TSV.
 # The first element of each tuple is the metric name.
 # The second is the name that will be used in the TSV header line.
-SIMPLE_METRICS = [
-  #  ("server.generic_current_allocated_bytes", "heap_allocated"),
-  #  ("server.log_block_manager_bytes_under_management", "bytes_on_disk"),
-  #  ("tablet.memrowset_size", "mrs_size"),
-  #  ("server.block_cache_usage", "bc_usage"),
+DEFAULT_SIMPLE_METRICS = [
+   ("server.generic_current_allocated_bytes", "heap_allocated"),
+   ("server.log_block_manager_bytes_under_management", "bytes_on_disk"),
+   ("tablet.memrowset_size", "mrs_size"),
+   ("server.block_cache_usage", "bc_usage"),
 ]
 
 # These metrics will be extracted as per-second rates into the TSV.
-RATE_METRICS = [
-  #  ("server.block_manager_total_bytes_read", "bytes_r_per_sec"),
-  #  ("server.block_manager_total_bytes_written", "bytes_w_per_sec"),
-  #  ("server.block_cache_lookups", "bc_lookups_per_sec"),
-  #  ("server.cpu_utime", "cpu_utime"),
-  #  ("server.cpu_stime", "cpu_stime"),
-  #  ("server.involuntary_context_switches", "invol_cs"),
-  #  ("server.voluntary_context_switches", "vol_cs"),
-  #  ("tablet.rows_inserted", "inserts_per_sec"),
-  #  ("tablet.rows_upserted", "upserts_per_sec"),
+DEFAULT_RATE_METRICS = [
+   ("server.block_manager_total_bytes_read", "bytes_r_per_sec"),
+   ("server.block_manager_total_bytes_written", "bytes_w_per_sec"),
+   ("server.block_cache_lookups", "bc_lookups_per_sec"),
+   ("server.cpu_utime", "cpu_utime"),
+   ("server.cpu_stime", "cpu_stime"),
+   ("server.involuntary_context_switches", "invol_cs"),
+   ("server.voluntary_context_switches", "vol_cs"),
+   ("tablet.rows_inserted", "inserts_per_sec"),
+   ("tablet.rows_upserted", "upserts_per_sec"),
+   ("tablet.leader_memory_pressure_rejections", "mem_rejections"),
 ]
 
 # These metrics will be extracted as percentile metrics into the TSV.
 # Each metric will generate several columns in the output TSV, with
 # percentile numbers suffixed to the column name provided here (foo_p95,
 # foo_p99, etc)
-HISTOGRAM_METRICS = [
-  #  ("server.op_apply_run_time", "apply_run_time"),
-  #  ("server.handler_latency_kudu_tserver_TabletServerService_Write", "write"),
-  #  ("server.handler_latency_kudu_consensus_ConsensusService_UpdateConsensus", "cons_update"),
-  #  ("server.handler_latency_kudu_consensus_ConsensusService_RequestVote", "vote"),
-  #  ("server.handler_latency_kudu_tserver_TabletCopyService_FetchData", "fetch_data"),
-  #  ("tablet.bloom_lookups_per_op", "bloom_lookups"),
-  #  ("tablet.log_append_latency", "log"),
-  #  ("tablet.op_prepare_run_time", "prep"),
-  #  ("tablet.write_op_duration_client_propagated_consistency", "op_dur")
+DEFAULT_HISTOGRAM_METRICS = [
+   ("server.op_apply_run_time", "apply_run_time"),
+   ("server.handler_latency_kudu_tserver_TabletServerService_Write", "write"),
+   ("server.handler_latency_kudu_consensus_ConsensusService_UpdateConsensus", "cons_update"),
+   ("server.handler_latency_kudu_consensus_ConsensusService_RequestVote", "vote"),
+   ("server.handler_latency_kudu_tserver_TabletCopyService_FetchData", "fetch_data"),
+   ("tablet.bloom_lookups_per_op", "bloom_lookups"),
+   ("tablet.log_append_latency", "log"),
+   ("tablet.op_prepare_run_time", "prep"),
+   ("tablet.write_op_duration_client_propagated_consistency", "op_dur")
 ]
 
-# Get the set of metrics we actuall want to bother parsing from the log.
-PARSE_METRIC_KEYS = set(key for (key, _) in (SIMPLE_METRICS + RATE_METRICS + HISTOGRAM_METRICS))
-
-# The script always reports cache-hit metrics.
-PARSE_METRIC_KEYS.add("server.block_cache_hits_caching")
-PARSE_METRIC_KEYS.add("server.block_cache_misses_caching")
-
 NaN = float('nan')
 UNKNOWN_PERCENTILES = dict(p50=0, p95=0, p99=0, p999=0, max=0)
 
@@ -103,7 +96,7 @@ def strip_metric(m):
   return NaN
 
 
-def json_to_map(j):
+def json_to_map(j, parse_metric_keys):
   """
   Parse the JSON structure in the log into a python dictionary of the form:
     { <entity>.<metric name>:String => { <entity id>:String => <values> } }
@@ -115,7 +108,7 @@ def json_to_map(j):
     for m in entity['metrics']:
       entity_id = entity['id']
       metric_key = entity['type'] + "." + m['name']
-      if metric_key not in PARSE_METRIC_KEYS:
+      if metric_key not in parse_metric_keys:
         continue
         # Add the metric_id to the metrics map.
       ret[metric_key][entity_id] = strip_metric(m)
@@ -147,7 +140,7 @@ def aggregate_metrics(metric_to_eid_to_vals):
           ret[metric_name] = vals.copy()
         else:
           # Otherwise, add the counts to what's there.
-          for val, count in vals.iteritems():
+          for val, count in vals.items():
             if val in ret[metric_name]:
               ret[metric_name][val] += count
             else:
@@ -173,8 +166,8 @@ def histogram_stats(aggregated_prev, aggregated_cur, m):
 
   # Determine the total count we should expect between the current and previous
   # snapshots.
-  delta_total = sum([val for _, val in cur.iteritems()]) - \
-      sum([val for _, val in prev.iteritems()])
+  delta_total = sum([val for _, val in cur.items()]) - \
+      sum([val for _, val in prev.items()])
 
   if delta_total == 0:
     return UNKNOWN_PERCENTILES
@@ -184,11 +177,10 @@ def histogram_stats(aggregated_prev, aggregated_cur, m):
 
   # Iterate over all of the buckets for the current and previous snapshots,
   # summing them up, and assigning percentiles to the bucket as appropriate.
-  for cur_val, cur_count in sorted(aggregated_cur[m].iteritems()):
+  for cur_val, cur_count in sorted(aggregated_cur[m].items()):
     prev_count = prev.get(cur_val, 0)
     delta_count = cur_count - prev_count
     cum_count += delta_count
-    percentile = float(cum_count) / delta_total
 
     # Determine which percentiles this bucket belongs to.
     percentile = float(cum_count) / delta_total
@@ -217,94 +209,112 @@ def cache_hit_ratio(aggregated_prev, aggregated_cur):
     cache_ratio = NaN
   return cache_ratio
 
-def process(aggregated_prev, aggregated_cur):
+def process(aggregated_prev, aggregated_cur, simple_metrics, rate_metrics, histogram_metrics):
   """ Process a pair of metric snapshots, outputting a line of TSV. """
   if not aggregated_prev:
-    aggregated_prev = aggregate_metrics(prev)
+    aggregated_prev = aggregate_metrics(aggregated_prev)
 
   delta_ts = aggregated_cur['ts'] - aggregated_prev['ts']
   calc_vals = []
   cache_ratio = cache_hit_ratio(aggregated_prev, aggregated_cur)
-  for metric, _ in SIMPLE_METRICS:
+  for metric, _ in simple_metrics:
     if metric in aggregated_cur:
       calc_vals.append(aggregated_cur[metric])
     else:
       calc_vals.append(0)
 
   calc_vals.extend((delta(aggregated_prev, aggregated_cur, metric))/delta_ts \
-      for metric, _ in RATE_METRICS)
-  for metric, _ in HISTOGRAM_METRICS:
+      for metric, _ in rate_metrics)
+  for metric, _ in histogram_metrics:
     stats = histogram_stats(aggregated_prev, aggregated_cur, metric)
     calc_vals.extend([stats['p50'], stats['p95'], stats['p99'], stats['p999'], stats['max']])
 
-  print((aggregated_cur['ts'] + aggregated_prev['ts'])/2,
-        cache_ratio,
-        " ".join(str(x) for x in calc_vals))
-  return aggregated_cur
+  return tuple([(aggregated_cur['ts'] + aggregated_prev['ts'])/2, cache_ratio] + calc_vals)
+
+class MetricsLogParser(object):
+  def __init__(self, paths,
+               simple_metrics=DEFAULT_SIMPLE_METRICS,
+               rate_metrics=DEFAULT_RATE_METRICS,
+               histogram_metrics=DEFAULT_HISTOGRAM_METRICS):
+    self.paths = paths
+    self.simple_metrics = simple_metrics
+    self.rate_metrics = rate_metrics
+    self.histogram_metrics = histogram_metrics
+    # Get the set of metrics we actually want to bother parsing from the log.
+    self.parse_metric_keys = set(key for (key, _) in (simple_metrics + rate_metrics + histogram_metrics))
+    # The script always reports cache-hit metrics.
+    self.parse_metric_keys.add("server.block_cache_hits_caching")
+    self.parse_metric_keys.add("server.block_cache_misses_caching")
+
+  def column_names(self):
+    simple_headers = [header for _, header in self.simple_metrics + self.rate_metrics]
+    for _, header in self.histogram_metrics:
+      simple_headers.append(header + "_p50")
+      simple_headers.append(header + "_p95")
+      simple_headers.append(header + "_p99")
+      simple_headers.append(header + "_p999")
+      simple_headers.append(header + "_max")
+    return tuple(["time", "cache_hit_ratio"] + simple_headers)
+
+  def __iter__(self):
+    prev_data = None
+    aggregated_prev = None
+
+    for path in sorted(self.paths):
+      if path.endswith(".gz"):
+        f = gzip.GzipFile(path)
+      else:
+        f = open(path)
+      for line_number, line in enumerate(f, start=1):
+        # Only parse out the "metrics" lines.
+        try:
+          (_, _, log_type, ts, metrics_json) = line.split(" ")
+        except ValueError:
+          continue
+        if log_type != "metrics":
+          continue
+        ts = float(ts) / 1000000.0
+        prev_ts = prev_data['ts'] if prev_data else 0
+        # Enforce that the samples come in time-sorted order.
+        if ts <= prev_ts:
+          raise Exception("timestamps must be in ascending order (%f <= %f at %s:%d)"
+                          % (ts, prev_ts, path, line_number))
+        if prev_data and ts < prev_ts + GRANULARITY_SECS:
+          continue
+
+        # Parse the metrics json into a map of the form:
+        #   { metric key => { entity id => metric value } }
+        data = json_to_map(json.loads(metrics_json), self.parse_metric_keys)
+        data['ts'] = ts
+        if prev_data:
+          # Copy missing metrics from prev_data.
+          for m, prev_eid_to_vals in prev_data.items():
+            if m is 'ts':
+              continue
+            # The metric was missing entirely; copy it over.
+            if m not in data:
+              data[m] = prev_eid_to_vals
+            else:
+              # If the metric was missing for a specific entity, copy the metric
+              # from the previous snapshot.
+              for eid, prev_vals in prev_eid_to_vals.items():
+                if eid not in data[m]:
+                  data[m][eid] = prev_vals
+
+        aggregated_cur = aggregate_metrics(data)
+        if prev_data:
+          if not aggregated_prev:
+            aggregated_prev = aggregate_metrics(prev_data)
+          yield process(aggregated_prev, aggregated_cur,
+                        self.simple_metrics, self.rate_metrics, self.histogram_metrics)
+
+        prev_data = data
+        aggregated_prev = aggregated_cur
 
 def main(argv):
-  prev_data = None
-  aggregated_prev = None
-
-  simple_headers = [header for _, header in SIMPLE_METRICS + RATE_METRICS]
-  for _, header in HISTOGRAM_METRICS:
-    simple_headers.append(header + "_p50")
-    simple_headers.append(header + "_p95")
-    simple_headers.append(header + "_p99")
-    simple_headers.append(header + "_p999")
-    simple_headers.append(header + "_max")
-
-  print("time cache_hit_ratio", " ".join(simple_headers))
-
-  for path in sorted(argv[1:]):
-    if path.endswith(".gz"):
-      f = gzip.GzipFile(path)
-    else:
-      f = open(path)
-    for line_number, line in enumerate(f, start=1):
-      # Only parse out the "metrics" lines.
-      try:
-        (_, _, log_type, ts, metrics_json) = line.split(" ")
-      except ValueError:
-        continue
-      if log_type != "metrics":
-        continue
-      ts = float(ts) / 1000000.0
-      prev_ts = prev_data['ts'] if prev_data else 0
-      # Enforce that the samples come in time-sorted order.
-      if ts <= prev_ts:
-        raise Exception("timestamps must be in ascending order (%f <= %f at %s:%d)"
-                        % (ts, prev_ts, path, line_number))
-      if prev_data and ts < prev_ts + GRANULARITY_SECS:
-        continue
-
-      # Parse the metrics json into a map of the form:
-      #   { metric key => { entity id => metric value } }
-      data = json_to_map(json.loads(metrics_json))
-      data['ts'] = ts
-      if prev_data:
-        # Copy missing metrics from prev_data.
-        for m, prev_eid_to_vals in prev_data.iteritems():
-          if m is 'ts':
-            continue
-          # The metric was missing entirely; copy it over.
-          if m not in data:
-            data[m] = prev_eid_to_vals
-          else:
-            # If the metric was missing for a specific entity, copy the metric
-            # from the previous snapshot.
-            for eid, prev_vals in prev_eid_to_vals.iteritems():
-              if eid not in data[m]:
-                data[m][eid] = prev_vals
-
-      aggregated_cur = aggregate_metrics(data)
-      if prev_data:
-        if not aggregated_prev:
-          aggregated_prev = aggregate_metrics(prev_data)
-        process(aggregated_prev, aggregated_cur)
-
-      prev_data = data
-      aggregated_prev = aggregated_cur
+  parser = MetricsLogParser(argv[1:], DEFAULT_SIMPLE_METRICS, DEFAULT_RATE_METRICS, DEFAULT_HISTOGRAM_METRICS)
+  for line in parser:
+    print(line)
 
 if __name__ == "__main__":
   main(sys.argv)