You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/10/18 14:10:12 UTC

[impala] 02/02: IMPALA-10921 Add script to compare TPCDS runs.

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dc313b3e4cae3dd57c55cc40e9c2b824082918a7
Author: Amogh Margoor <am...@cloudera.com>
AuthorDate: Tue Sep 14 03:03:08 2021 -0700

    IMPALA-10921 Add script to compare TPCDS runs.
    
    This script compares 2 runs of TPCDS by parsing their respective
    Impala plain text query profiles. It currently outputs the peak
    memory comparision of both runs where:
    1. It compares average per-node peak memory and geo-mean
       per-node peak memory.
    2. It compares max peak memory reduction among Hash operators.
    
    It can be extended to other comparisions in future.
    
    Example usage:
    
     tpcds_run_comparator.py <path to base run profile>
     <path to new run profile> [path to result csv file]
    
    Change-Id: Ib2e9ae1a2919156b0022072f47ff71d7775b20e6
    Reviewed-on: http://gerrit.cloudera.org:8080/17855
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 .../experimental/tpcds_run_comparator.py           | 297 +++++++++++++++++++++
 1 file changed, 297 insertions(+)

diff --git a/bin/diagnostics/experimental/tpcds_run_comparator.py b/bin/diagnostics/experimental/tpcds_run_comparator.py
new file mode 100755
index 0000000..3c635cd
--- /dev/null
+++ b/bin/diagnostics/experimental/tpcds_run_comparator.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python3
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+
+import argparse
+import csv
+import math
+import re
+import statistics
+import os
+
+from enum import Enum
+from collections import namedtuple
+
+# This script compares 2 runs of TPCDS by parsing their respective Impala plain text
+# query profiles. It currently outputs the peak memory comparision of both runs where:
+# 1. It compares average per-node peak memory and geo-mean per-node peak memory.
+# 2. It compares max peak memory reduction among Hash operators.
+#
+# It can be extended to other comparisions in future.
+#
+# Example usage:
+#
+#   tpcds_run_comparator.py <path to base run profile> <path to new run profile>
+#                           [path to result csv file]
+#
+
+
+class Task(Enum):
+  DISCARD = 0
+  PARSE_EXEC_SUMM = 3
+  PARSE_PEAK_MEMORY = 4
+
+
+ParseTask = namedtuple('ParseTask', ['task', 'delimiter'])
+ExecSumm = namedtuple('ExecSumm',
+    ['id', 'name', 'num_host', 'num_inst', 'avg_time', 'max_time', 'num_rows',
+      'est_rows', 'peak_mem', 'est_peak_mem', 'detail'])
+
+RE_PEAK_MEM = re.compile("\d+\.\d\d [GMK]?B")
+
+# Defines list of sections in a profile file.
+# Format: (Task Enum, Section Delimeter)
+# Section Delimeter is the first line of next section in the list.
+SECTIONS = [
+    ParseTask(Task.DISCARD,
+      "    ExecSummary:"),
+    ParseTask(Task.PARSE_EXEC_SUMM,
+      "    Errors:"),
+    ParseTask(Task.DISCARD,
+      "    Per Node Peak Memory Usage:"),
+    ParseTask(Task.PARSE_PEAK_MEMORY,
+      "    Per Node Bytes Read:")
+  ]
+
+
+# Other const.
+DEBUG = False
+
+
+def debug(s):
+  if DEBUG:
+    print(s)
+
+
+def geo_mean(bytes):
+  """Compute geo mean"""
+  sum_log = sum([math.log(x) for x in bytes])
+  return math.exp(float(sum_log) / len(bytes))
+
+
+def str_to_byte(line):
+  """Parse byte string into integer."""
+  parts = line.split(' ')
+  byte = float(parts[0])
+  unit = parts[1]
+  if unit == 'B':
+    byte *= 1
+  elif unit == 'KB':
+    byte *= 1024
+  elif unit == 'MB':
+    byte *= 1024**2
+  elif unit == 'GB':
+    byte *= 1024**3
+  else:
+    raise ValueError("Invalid unit for byte string: {}".format(byte))
+  return math.trunc(byte)
+
+
+class ProfileParser:
+  """Class that parses Impala plain text query profile"""
+
+  def __init__(self):
+    # execution summary
+    self.exec_summ_ct = 0
+    self.exec_summ_map = {}
+    # peak memory
+    self.peak_mem_ct = 0
+    self.peak_mem = []
+    # result
+    self.ht_mem_res = []
+    self.op_mem_res = []
+
+  def parse_peak_mem(self, line):
+    """Parses the peak memory"""
+    debug('Parsing Peak memory')
+    if self.peak_mem_ct > 1:
+      return
+    self.peak_mem_ct += 1
+    mems = RE_PEAK_MEM.findall(line)
+    self.peak_mem = [str_to_byte(i) for i in mems]
+    debug('Peak memories: ' + ",".join([str(i) for i in self.peak_mem]))
+
+  def parse_exec_summ(self, line):
+    """Parse execution summary section.
+    This section begins with 'ExecSummary:' line in query profile."""
+    self.exec_summ_ct += 1
+    if self.exec_summ_ct <= 3:
+      return
+
+    parts = list(
+        map(lambda x: x.strip(),
+          filter(None,
+            line.strip().replace("|", " ").replace("--", "  ").split("  "))))
+
+    # 3 columns in ExecSummary are optional and may be empty for few operators.
+    # These are #Rows, Est. #Rows and Detail.
+    # Normalize all the entries to 10 parts below.
+    if len(parts) == 7:
+      parts.insert(5, "-")
+      parts.insert(6, "-")
+    assert len(parts) >= 9
+    if len(parts) == 9:
+      parts.append("-")
+    assert len(parts) == 10
+
+    # split id and name from parts[0].
+    tok = parts[0].split(':')
+    parts[0] = tok[1]
+    parts.insert(0, tok[0])
+    assert len(parts) == 11
+
+    es = ExecSumm._make(parts)
+    debug(es)
+    self.exec_summ_map[es.id] = es
+
+  def parse(self, inf):
+    """Walk through impala query plan and parse each section accordingly.
+    A section ends when delimiter of that section is found."""
+    i = 0
+    for line in inf:
+      if (line.rstrip().startswith(SECTIONS[i].delimiter) or
+            (not SECTIONS[i].delimiter and not line.rstrip())):
+        debug('Found delimiter ' + SECTIONS[i].delimiter + ' in ' + line)
+        i += 1
+      if i == len(SECTIONS):
+        break
+      if SECTIONS[i].task == Task.DISCARD:
+        continue
+      elif SECTIONS[i].task == Task.PARSE_EXEC_SUMM:
+        self.parse_exec_summ(line)
+      elif SECTIONS[i].task == Task.PARSE_PEAK_MEMORY:
+        self.parse_peak_mem(line)
+
+  def filter_exec_join(self, op):
+    filter_set = {'HASH JOIN', 'JOIN BUILD', 'AGGREGATE'}
+    return op.name in filter_set
+
+  def compare_hash_table_mem(self, pp, path, ht_mem_res):
+    join_list1 = filter(self.filter_exec_join, self.exec_summ_map.values())
+    result = [os.path.basename(path), '0.0', '0.0', '0.0']
+    max_hash_percent = 0.0
+    for j in join_list1:
+      bytes1 = str_to_byte(j.peak_mem)
+      bytes2 = str_to_byte(pp.exec_summ_map[j.id].peak_mem)
+      # Checks the percent reduction for operators greater than 10 MB.
+      if bytes1 > 10 * 1024 * 1024 and bytes2 != bytes1:
+        percent = float(bytes1 - bytes2) / bytes1 * 100
+        if abs(percent) > max_hash_percent:
+          result = []
+          result = [os.path.basename(path), str(round(bytes1 / float(1024 * 1024))),
+            str(round(bytes2 / float(1024 * 1024))),
+            str(round(percent, 1))]
+          max_hash_percent = percent
+    ht_mem_res.append(result)
+
+  def compare_peak_mem(self, pp, path, op_mem_res):
+    avg1 = statistics.mean(self.peak_mem)
+    debug('average1=' + str(avg1))
+    avg2 = statistics.mean(pp.peak_mem)
+    debug('average2=' + str(avg2))
+    geo_mean1 = geo_mean(self.peak_mem)
+    debug('geomean1=' + str(geo_mean1))
+    geo_mean2 = geo_mean(pp.peak_mem)
+    debug('geomean2=' + str(geo_mean2))
+    max1 = max(self.peak_mem)
+    max2 = max(pp.peak_mem)
+    reduction_avg = float(avg1 - avg2) / avg1 * 100
+    reduction_geomean = float(geo_mean1 - geo_mean2) / geo_mean1 * 100
+    reduction_max = float(max1 - max2) / max1 * 100
+    res = [avg1 / (1024 * 1024), avg2 / (1024 * 1024), reduction_avg,
+      geo_mean1 / (1024 * 1024), geo_mean2 / (1024 * 1024), reduction_geomean,
+      reduction_max]
+    fres = [str(round(i, 1)) for i in res]
+    fres.insert(0, os.path.basename(path))
+    op_mem_res.append(fres)
+    debug(",".join(fres))
+
+
+def print_results(ht_mem_res, op_mem_res):
+  print("Peak Memory Comparision")
+  print("-----------------------")
+  print("")
+  print("1. Maximum Reduction in Per-operator Peak Memory")
+  print("""TPCDS query profile, base peak memory, new peak memory""")
+  print("")
+  for ht in ht_mem_res:
+    print(",".join(ht))
+  print("")
+  print("2. Reduction in Per-Node Peak Memory")
+  print("""TPCDS query profile, base avg (MB), new avg (MB), avg reduction %,
+    base geomean (MB), new geomean (MB), geomean reduction % """)
+  for mem in op_mem_res:
+    print(",".join(mem))
+
+
+def results_to_csv(ht_mem_res, op_mem_res, csv_path):
+  with open(csv_path, 'w') as csv_f:
+    writer = csv.writer(csv_f)
+    header1 = ['TPCDS query profile', 'base peak memory', 'new peak memory']
+    writer.writerow(header1)
+    writer.writerows(ht_mem_res)
+    header2 = ['TPCDS query profile', 'base avg (MB)', 'new avg (MB)',
+      'avg reduction %', 'base geomean (MB)', 'new geomean (MB)', 'geomean reduction %']
+    writer.writerow(header2)
+    writer.writerows(op_mem_res)
+
+
+def is_dir(path):
+  if os.path.isdir(path):
+    return path
+  else:
+    raise argparse.ArgumentTypeError("{} is not a valid path".format(path))
+
+
+def main():
+  parser = argparse.ArgumentParser(
+      description="""This script reads Impala plain text query profiles of two TPCDS
+        runs and compare peak memories""")
+  parser.add_argument("basedir", type=is_dir,
+      help='Impala baseline query profiles directory.')
+  parser.add_argument("newdir", type=is_dir,
+      help='Impala new query profiles directory to be compared with baseline.')
+  parser.add_argument("result", nargs='?', default=None,
+      help="""Specify output CSV file path to be used to write results,
+        else it would be printed out to stdout""")
+  args = parser.parse_args()
+  op_mem_res = []
+  ht_mem_res = []
+  basefiles = [f for f in os.listdir(args.basedir)
+    if os.path.isfile(os.path.join(args.basedir, f))]
+  for filename in basefiles:
+    path1 = os.path.join(args.basedir, filename)
+    path2 = os.path.join(args.newdir, filename)
+    if os.path.isfile(path1) and os.path.isfile(path2):
+      debug("{} is being parsed ".format(path1))
+      with open(path1, "r") as f1:
+        with open(path2, "r") as f2:
+          pp1 = ProfileParser()
+          pp1.parse(f1)
+          pp2 = ProfileParser()
+          pp2.parse(f2)
+          pp1.compare_hash_table_mem(pp2, path1, ht_mem_res)
+          pp1.compare_peak_mem(pp2, path1, op_mem_res)
+  if args.result is None:
+    print_results(ht_mem_res, op_mem_res)
+  else:
+    results_to_csv(ht_mem_res, op_mem_res, args.result)
+
+
+if __name__ == "__main__":
+  main()