You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2024/01/20 16:56:15 UTC

(impala) 01/02: IMPALA-12038: Switch report_benchmark_results.py to python 3

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3af3b2c8ab25ac132e78a69cd1e0a4bed1c99bb3
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Mon Apr 3 21:58:57 2023 -0700

    IMPALA-12038: Switch report_benchmark_results.py to python 3
    
    When using TPC-DS with a large number of iterations, the
    results JSON files are enormous. Using Python2,
    report_benchmark_results.py runs out of memory and fails to
    produce the report. Python 3 is more efficient in how it
    processes Unicode inputs (see Python PEP-0393), so it's
    memory usage is much lower. It is able to handle generating
    reports that Python 2 cannot.
    
    As a general cleanup, this fixes all the flake8 issues for this file.
    
    Testing:
     - Processed very large JSON results (4+GB each for both baseline
       result and new result). Python 3 completes successfully when
       Python 2 failed.
    
    Change-Id: Idbde17f720b18d38dc2c2104ecf3fec807c1839d
    Reviewed-on: http://gerrit.cloudera.org:8080/20918
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/benchmark/report_benchmark_results.py | 122 +++++++++++++++-------------
 1 file changed, 66 insertions(+), 56 deletions(-)

diff --git a/tests/benchmark/report_benchmark_results.py b/tests/benchmark/report_benchmark_results.py
index 6a447d5ed..e7ddcc123 100755
--- a/tests/benchmark/report_benchmark_results.py
+++ b/tests/benchmark/report_benchmark_results.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env impala-python
+#!/usr/bin/env impala-python3
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -38,7 +38,7 @@ import os
 import prettytable
 import re
 from collections import defaultdict
-from datetime import date, datetime
+from datetime import date
 from optparse import OptionParser
 from tests.util.calculation_util import (
     calculate_tval, calculate_avg, calculate_stddev, calculate_geomean, calculate_mwu)
@@ -113,30 +113,30 @@ parser.add_option("--output_all_summary_nodes", dest="output_all_summary_nodes",
 parser.add_option("--build_version", dest="build_version", default='UNKNOWN',
                  help="Build/version info about the Impalad instance results are from.")
 parser.add_option("--lab_run_info", dest="lab_run_info", default='UNKNOWN',
-                 help="Information about the lab run (name/id) that published "\
+                 help="Information about the lab run (name/id) that published "
                  "the results.")
 parser.add_option("--run_user_name", dest="run_user_name", default='anonymous',
                  help="User name that this run is associated with in the perf database")
 parser.add_option("--tval_threshold", dest="tval_threshold", default=3.0,
-                 type="float", help="The ttest t-value at which a performance change "\
+                 type="float", help="The ttest t-value at which a performance change "
                  "will be flagged as sigificant.")
 parser.add_option("--zval_threshold", dest="zval_threshold", default=3.0, type="float",
                   help="The Mann-Whitney Z-value at which a performance change will be "
                   "flagged as sigificant.")
 parser.add_option("--min_percent_change_threshold",
                  dest="min_percent_change_threshold", default=5.0,
-                 type="float", help="Any performance changes below this threshold" \
-                 " will not be classified as significant. If the user specifies an" \
+                 type="float", help="Any performance changes below this threshold"
+                 " will not be classified as significant. If the user specifies an"
                  " empty value, the threshold will be set to 0")
 parser.add_option("--max_percent_change_threshold",
                  dest="max_percent_change_threshold", default=float("inf"),
-                 type="float", help="Any performance changes above this threshold"\
-                 " will be classified as significant. If the user specifies an" \
+                 type="float", help="Any performance changes above this threshold"
+                 " will be classified as significant. If the user specifies an"
                  " empty value, the threshold will be set to positive infinity")
 parser.add_option("--allowed_latency_diff_secs",
                  dest="allowed_latency_diff_secs", default=0.0, type="float",
-                 help="If specified, only a timing change that differs by more than\
-                 this value will be considered significant.")
+                 help="If specified, only a timing change that differs by more than"
+                 " this value will be considered significant.")
 options, args = parser.parse_args()
 
 
@@ -187,8 +187,8 @@ def get_dict_from_json(filename):
       level.append([('query', 'workload_name'), ('query', 'scale_factor')])
       # In the middle layer, we group by file format and compression type
       level.append([('query', 'test_vector', 'file_format'),
-      ('query', 'test_vector', 'compression_codec'),
-      ('query', 'test_vector', 'compression_type')])
+                    ('query', 'test_vector', 'compression_codec'),
+                    ('query', 'test_vector', 'compression_type')])
       # In the bottom layer, we group by query name
       level.append([('query', 'name')])
 
@@ -219,9 +219,9 @@ def get_dict_from_json(filename):
       cur = cur[get_key(level_num)]
     cur[RESULT_LIST].append(query_result)
 
-  with open(filename, "r") as f:
+  with open(filename, "rb") as f:
     data = json.loads(f.read().decode("utf-8", "ignore"))
-    grouped = defaultdict( lambda: defaultdict(
+    grouped = defaultdict(lambda: defaultdict(
         lambda: defaultdict(lambda: defaultdict(list))))
     for workload_name, workload in data.items():
       for query_result in workload:
@@ -230,6 +230,7 @@ def get_dict_from_json(filename):
     calculate_time_stats(grouped)
     return grouped
 
+
 def all_query_results(grouped):
   for workload_scale, workload in grouped.items():
     for file_format, queries in workload.items():
@@ -249,19 +250,21 @@ def get_commit_date(commit_sha):
     response = urlopen(request).read()
     data = json.loads(response.decode('utf8'))
     return data['commit']['committer']['date'][:10]
-  except:
+  except Exception:
     return ''
 
+
 def get_impala_version(grouped):
   """Figure out Impala version by looking at query profile."""
   first_result = next(all_query_results(grouped))
   profile = first_result['result_list'][0]['runtime_profile']
-  match = re.search('Impala Version:\s(.*)\s\(build\s(.*)\)', profile)
+  match = re.search(r'Impala Version:\s(.*)\s\(build\s(.*)\)', profile)
   version = match.group(1)
   commit_sha = match.group(2)
   commit_date = get_commit_date(commit_sha)
   return '{0} ({1})'.format(version, commit_date)
 
+
 def calculate_time_stats(grouped):
   """
   Add statistics to the nested dictionary.
@@ -298,6 +301,7 @@ def calculate_time_stats(grouped):
         results[SORTED] = [query_results[TIME_TAKEN] for query_results in result_list]
         results[SORTED].sort()
 
+
 class Report(object):
 
   significant_perf_change = False
@@ -322,7 +326,6 @@ class Report(object):
           for ref_query_results in ref_results[RESULT_LIST]:
             ref_time_list.append(ref_query_results[TIME_TAKEN])
 
-
       self.workload_name = '{0}({1})'.format(
           workload_scale[0][1].upper(), workload_scale[1][1])
 
@@ -437,15 +440,15 @@ class Report(object):
                  "({ref_avg:.2f}s -> {avg:.2f}s [{delta:+.2%}])\n")
 
       perf_change_str = template.format(
-          perf_change_type = perf_change_type,
-          workload_name = workload_name,
-          query_name = query_name,
-          file_format = file_format,
-          compression_codec = compression_codec,
-          compression_type = compression_type,
-          ref_avg = ref_result[AVG],
-          avg = result[AVG],
-          delta = calculate_change(result[AVG], ref_result[AVG]))
+          perf_change_type=perf_change_type,
+          workload_name=workload_name,
+          query_name=query_name,
+          file_format=file_format,
+          compression_codec=compression_codec,
+          compression_type=compression_type,
+          ref_avg=ref_result[AVG],
+          avg=result[AVG],
+          delta=calculate_change(result[AVG], ref_result[AVG]))
 
       perf_change_str += build_exec_summary_str(result, ref_result)
 
@@ -460,7 +463,7 @@ class Report(object):
         self.base_rel_stddev = float('inf')
       else:
         self.base_rel_stddev = ref_results[STDDEV] / ref_results[AVG]\
-            if ref_results > 0 else 0.0
+            if ref_results[AVG] > 0 else 0.0
 
       self.workload_name = '{0}({1})'.format(
           results[RESULT_LIST][0][QUERY][WORKLOAD_NAME].upper(),
@@ -479,17 +482,17 @@ class Report(object):
                  "({base_rel_stddev:.2%} -> {rel_stddev:.2%})\n")
 
       if self.significant_variability and ref_results:
-        #If ref_results do not exist, variability analysis will not be conducted
+        # If ref_results do not exist, variability analysis will not be conducted
         self.variability_str = variability_template.format(
-            workload_name = self.workload_name,
-            query_name = self.query_name,
-            file_format = self.file_format,
-            compression = self.compression,
-            base_rel_stddev = self.base_rel_stddev,
-            rel_stddev = self.rel_stddev)
+            workload_name=self.workload_name,
+            query_name=self.query_name,
+            file_format=self.file_format,
+            compression=self.compression,
+            base_rel_stddev=self.base_rel_stddev,
+            rel_stddev=self.rel_stddev)
 
         self.exec_summary_str = build_exec_summary_str(
-            results, ref_results, for_variability = True)
+            results, ref_results, for_variability=True)
       else:
         self.variability_str = str()
         self.exec_summary_str = str()
@@ -511,12 +514,12 @@ class Report(object):
     for workload_scale, workload in self.grouped.items():
       for file_format, queries in workload.items():
         if self.ref_grouped is not None and workload_scale in self.ref_grouped and\
-            file_format in self.ref_grouped[ workload_scale]:
+            file_format in self.ref_grouped[workload_scale]:
           ref_queries = self.ref_grouped[workload_scale][file_format]
           self.file_format_comparison_rows.append(Report.FileFormatComparisonRow(
             workload_scale, file_format, queries, ref_queries))
         else:
-          #If not present in reference results, set to None
+          # If not present in reference results, set to None
           ref_queries = None
         for query_name, results in queries.items():
           if self.ref_grouped is not None and workload_scale in self.ref_grouped and\
@@ -529,19 +532,19 @@ class Report(object):
             query_variability_row = Report.QueryVariabilityRow(results, ref_results)
             self.query_variability_rows.append(query_variability_row)
           else:
-            #If not present in reference results, set to None
+            # If not present in reference results, set to None
             ref_results = None
 
   def __str__(self):
     output = str()
 
-    #per file format analysis overview table
+    # per file format analysis overview table
     table = prettytable.PrettyTable(['Workload', 'File Format', 'Avg (s)', 'Delta(Avg)',
                                      'GeoMean(s)', 'Delta(GeoMean)'])
     table.float_format = '.2'
     table.align = 'l'
     self.file_format_comparison_rows.sort(
-        key = lambda row: row.delta_geomean, reverse = True)
+        key=lambda row: row.delta_geomean, reverse=True)
     for row in self.file_format_comparison_rows:
       table_row = [
           row.workload_name,
@@ -554,7 +557,7 @@ class Report(object):
 
     output += str(table) + '\n\n'
 
-    #main comparison table
+    # main comparison table
     detailed_performance_change_analysis_str = str()
     table = prettytable.PrettyTable(['Workload', 'Query', 'File Format', 'Avg(s)',
                                      'Base Avg(s)', 'Delta(Avg)', 'StdDev(%)',
@@ -562,7 +565,7 @@ class Report(object):
                                      'MW Zval', 'Tval'])
     table.float_format = '.2'
     table.align = 'l'
-    #Sort table from worst to best regression
+    # Sort table from worst to best regression
     self.query_comparison_rows.sort(key=lambda row: row.delta_avg + row.median_diff,
                                     reverse=True)
     for row in self.query_comparison_rows:
@@ -593,7 +596,7 @@ class Report(object):
     output += detailed_performance_change_analysis_str
 
     variability_analysis_str = str()
-    self.query_variability_rows.sort(key = lambda row: row.rel_stddev, reverse = True)
+    self.query_variability_rows.sort(key=lambda row: row.rel_stddev, reverse=True)
     for row in self.query_variability_rows:
       variability_analysis_str += str(row)
 
@@ -608,6 +611,7 @@ class Report(object):
 
     return output
 
+
 class CombinedExecSummaries(object):
   """All execution summaries for each query are combined into this object.
 
@@ -714,7 +718,7 @@ class CombinedExecSummaries(object):
     table.float_format = '.2'
 
     for row in self.rows:
-      table_row = [ row[PREFIX] + row[OPERATOR],
+      table_row = [row[PREFIX] + row[OPERATOR],
           prettyprint_values(row[NUM_HOSTS]),
           prettyprint_values(row[NUM_INSTANCES]),
           prettyprint_time(row[AVG_TIME]),
@@ -806,7 +810,7 @@ class ExecSummaryComparison(object):
   CombinedExecSummaries.compare(reference).
   """
 
-  def __init__(self, combined_summary, ref_combined_summary, for_variability = False):
+  def __init__(self, combined_summary, ref_combined_summary, for_variability=False):
 
     # Store the original summaries, in case we can't build a comparison
     self.combined_summary = combined_summary
@@ -927,7 +931,7 @@ class ExecSummaryComparison(object):
           prettyprint_values(row[NUM_HOSTS]),
           prettyprint_values(row[NUM_INSTANCES]),
           prettyprint_values(row[NUM_ROWS]),
-          prettyprint_values(row[EST_NUM_ROWS]) ]
+          prettyprint_values(row[EST_NUM_ROWS])]
 
       table_contains_at_least_one_row = True
       table.add_row(table_row)
@@ -942,8 +946,8 @@ class ExecSummaryComparison(object):
     def is_significant(row):
       """Check if the performance change in the row was significant"""
       return options.output_all_summary_nodes or (
-        row[MAX_TIME] > 100000000 and
-        row[PERCENT_OF_QUERY] > 0.02)
+        row[MAX_TIME] > 100000000
+        and row[PERCENT_OF_QUERY] > 0.02)
 
     self.__build_rows()
     if self.error_str:
@@ -993,10 +997,12 @@ class ExecSummaryComparison(object):
 
     return str(table)
 
+
 def calculate_change(val, ref_val):
   """Calculate how big the change in val compared to ref_val is compared to total"""
   return (val - ref_val) / ref_val if ref_val != 0 else 0.0
 
+
 def prettyprint(val, units, divisor):
   """ Print a value in human readable format along with it's unit.
 
@@ -1016,15 +1022,19 @@ def prettyprint(val, units, divisor):
         return "%3.2f%s" % (val, unit)
     val /= divisor
 
+
 def prettyprint_bytes(byte_val):
   return prettyprint(byte_val, ['B', 'KB', 'MB', 'GB', 'TB'], 1024.0)
 
+
 def prettyprint_values(unit_val):
   return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
 
+
 def prettyprint_time(time_val):
   return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
 
+
 def prettyprint_percent(percent_val):
   return '{0:+.2%}'.format(percent_val)
 
@@ -1056,12 +1066,12 @@ def save_runtime_diffs(results, ref_results, change_significant, zval, tval):
       prefix = 'imp'
 
   runtime_profile_file_name = template.format(
-      prefix = prefix,
-      query_name = query[NAME],
-      scale_factor = query[SCALE_FACTOR],
-      file_format = query[TEST_VECTOR][FILE_FORMAT],
-      compression_codec = query[TEST_VECTOR][COMPRESSION_CODEC],
-      compression_type = query[TEST_VECTOR][COMPRESSION_TYPE])
+      prefix=prefix,
+      query_name=query[NAME],
+      scale_factor=query[SCALE_FACTOR],
+      file_format=query[TEST_VECTOR][FILE_FORMAT],
+      compression_codec=query[TEST_VECTOR][COMPRESSION_CODEC],
+      compression_type=query[TEST_VECTOR][COMPRESSION_TYPE])
 
   # Go into results dir
   dir_path = os.path.join(os.environ["IMPALA_HOME"], 'results')
@@ -1076,8 +1086,8 @@ def save_runtime_diffs(results, ref_results, change_significant, zval, tval):
   runtime_profile_diff = diff.make_file(
       ref_runtime_profile.splitlines(),
       runtime_profile.splitlines(),
-      fromdesc = "Baseline Runtime Profile",
-      todesc = "Current Runtime Profile")
+      fromdesc="Baseline Runtime Profile",
+      todesc="Current Runtime Profile")
 
   with open(runtime_profile_file_path, 'w+') as f:
     f.write(runtime_profile_diff)