You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/11/26 07:43:15 UTC

[impala] branch master updated: IMPALA-4618: Fixing #Hosts and adding #Instances in exec summary

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2114fc6  IMPALA-4618: Fixing #Hosts and adding #Instances in exec summary
2114fc6 is described below

commit 2114fc6155e5bb21e398ded21c5bdfd1e7f49879
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Fri Nov 15 14:57:19 2019 +0100

    IMPALA-4618: Fixing #Hosts and adding #Instances in exec summary
    
    When mt_dop > 0, the summary is reporting the number of fragment
    instances, instead of the number of hosts as the header would
    imply.
    
    This commit fixes the issue so the number of hosts will be shown
    under the #Hosts column. The commit also adds an #Inst column
    where the number of instances are shown (current behaviour).
    
    Tests:
     * Changed profile tests with mt_dop > 0.
     * Updated benchmark tests and shell tests accordingly.
    
    Change-Id: I3bdf9a06d9bd842b2397cd16c28294b6bec7af69
    Reviewed-on: http://gerrit.cloudera.org:8080/14715
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc                      | 13 ++++-
 be/src/util/summary-util.cc                        |  2 +
 common/thrift/ExecStats.thrift                     |  4 ++
 shell/impala_client.py                             |  6 +-
 shell/impala_shell.py                              |  5 +-
 .../QueryTest/mt-dop-parquet-scheduling.test       | 64 +++++++++++-----------
 tests/benchmark/report_benchmark_results.py        | 10 +++-
 tests/shell/test_shell_commandline.py              |  2 +-
 8 files changed, 65 insertions(+), 41 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 1d2e88f..2ba678d 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -225,8 +225,15 @@ void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
 
       const TPlan& plan = fragment.plan;
       const TDataSink& output_sink = fragment.output_sink;
-      int num_instances =
-          schedule.GetFragmentExecParams(fragment.idx).instance_exec_params.size();
+      // Count the number of hosts and instances.
+      const vector<FInstanceExecParams>& instance_params =
+          schedule.GetFragmentExecParams(fragment.idx).instance_exec_params;
+      unordered_set<TNetworkAddress> host_set;
+      for (const FInstanceExecParams& instance: instance_params) {
+        host_set.insert(instance.host);
+      }
+      int num_hosts = host_set.size();
+      int num_instances = instance_params.size();
 
       // Add the data sink at the root of the fragment.
       data_sink_id_to_idx_map[fragment.idx] = thrift_exec_summary.nodes.size();
@@ -241,6 +248,7 @@ void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
       node_summary.__set_num_children(1);
       DCHECK(output_sink.__isset.estimated_stats);
       node_summary.__set_estimated_stats(output_sink.estimated_stats);
+      node_summary.__set_num_hosts(num_hosts);
       node_summary.exec_stats.resize(num_instances);
 
       // We don't track rows returned from sinks, but some clients like impala-shell
@@ -262,6 +270,7 @@ void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
         node_summary.__set_num_children(node.num_children);
         DCHECK(node.__isset.estimated_stats);
         node_summary.__set_estimated_stats(node.estimated_stats);
+        node_summary.__set_num_hosts(num_hosts);
         node_summary.exec_stats.resize(num_instances);
       }
 
diff --git a/be/src/util/summary-util.cc b/be/src/util/summary-util.cc
index ab90e87..aded383 100644
--- a/be/src/util/summary-util.cc
+++ b/be/src/util/summary-util.cc
@@ -73,6 +73,7 @@ void PrintExecSummary(const TExecSummary& exec_summary, int indent_level,
 
   vector<string> row;
   row.push_back(label_ss.str());
+  row.push_back(lexical_cast<string>(node.num_hosts));
   row.push_back(lexical_cast<string>(node.exec_stats.size())); // Num instances
   row.push_back(PrettyPrinter::Print(avg_time, TUnit::TIME_NS));
   row.push_back(PrettyPrinter::Print(max_stats.latency_ns, TUnit::TIME_NS));
@@ -121,6 +122,7 @@ string impala::PrintExecSummary(const TExecSummary& exec_summary) {
   printer.set_max_output_width(1000);
   printer.AddColumn("Operator", true);
   printer.AddColumn("#Hosts", false);
+  printer.AddColumn("#Inst", false);
   printer.AddColumn("Avg Time", false);
   printer.AddColumn("Max Time", false);
   printer.AddColumn("#Rows", false);
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 60b6a18..30c1fcd 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -75,6 +75,10 @@ struct TPlanNodeExecSummary {
 
   // If true, this is an exchange node that is the receiver of a broadcast.
   8: optional bool is_broadcast
+
+  // The number of hosts. It cannot be inferred from exec_stats, since the length of the
+  // list can be greater when mt_dop > 0.
+  9: optional i32 num_hosts
 }
 
 // Progress counters for an in-flight query.
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 4dd22a8..66eed04 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -560,12 +560,12 @@ class ImpalaClient(object):
     def prettyprint_time(time_val):
       return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
 
-    hosts = 0
+    instances = 0
     if node.exec_stats is not None:
-      hosts = len(node.exec_stats)
+      instances = len(node.exec_stats)
     is_sink = node.node_id == -1
     row = [ label_prefix + node.label,
-            hosts,
+            node.num_hosts, instances,
             prettyprint_time(avg_time),
             prettyprint_time(max_stats.latency_ns),
             "" if is_sink else prettyprint_units(cardinality),
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 554ae26..1e67415 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1077,8 +1077,9 @@ class ImpalaShell(object, cmd.Cmd):
       self.last_summary = time.time()
 
   def _default_summary_table(self):
-    return self.construct_table_with_header(["Operator", "#Hosts", "Avg Time", "Max Time",
-                                             "#Rows", "Est. #Rows", "Peak Mem",
+    return self.construct_table_with_header(["Operator", "#Hosts", "#Inst",
+                                             "Avg Time", "Max Time", "#Rows",
+                                             "Est. #Rows", "Peak Mem",
                                              "Est. Peak Mem", "Detail"])
 
   def _execute_stmt(self, query_str, is_dml=False, print_web_link=False):
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test
index d0b1a2a..800fb67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-scheduling.test
@@ -41,12 +41,12 @@ select min(string_col) from (
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4 .*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 12 .*alltypessmall.*
-row_regex:.*03:SCAN HDFS * 12 .*alltypestiny.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypes.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 12 .*alltypessmall.*
+row_regex:.*03:SCAN HDFS * 3 * 12 .*alltypestiny.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypes.*
 ====
 ---- QUERY
 # Same idea, but with smallest scan first to check that the scheduler is taking the
@@ -61,12 +61,12 @@ select min(string_col) from (
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4 .*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 12 .*alltypessmall.*
-row_regex:.*03:SCAN HDFS * 12 .*alltypes.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypestiny.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 12 .*alltypessmall.*
+row_regex:.*03:SCAN HDFS * 3 * 12 .*alltypes.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypestiny.*
 ====
 ---- QUERY
 # This query should have one scan and one exchange in the interior fragment.
@@ -80,13 +80,13 @@ select min(string_col) from (
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4.*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*06:AGGREGATE * 12 .*
-row_regex:.*03:AGGREGATE * 12 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 12 .*alltypes.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypestiny.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*06:AGGREGATE * 3 * 12 .*
+row_regex:.*03:AGGREGATE * 3 * 12 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 12 .*alltypes.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypestiny.*
 ====
 ---- QUERY
 # This query should have one scan and one exchange in the interior fragment.
@@ -102,13 +102,13 @@ select min(string_col) from (
 '0'
 ---- RUNTIME_PROFILE
 row_regex:.*AdmissionSlots: 4.*
-row_regex:.*F04:ROOT * 1 .*
-row_regex:.*04:AGGREGATE * 12 .*
-row_regex:.*06:AGGREGATE * 12 .*
-row_regex:.*03:AGGREGATE * 4 .*
-row_regex:.*00:UNION * 12 *
-row_regex:.*02:SCAN HDFS * 4 .*alltypestiny.*
-row_regex:.*01:SCAN HDFS * 12 .*alltypes.*
+row_regex:.*F04:ROOT * 1 * 1 .*
+row_regex:.*04:AGGREGATE * 3 * 12 .*
+row_regex:.*06:AGGREGATE * 3 * 12 .*
+row_regex:.*03:AGGREGATE * 3 * 4 .*
+row_regex:.*00:UNION * 3 * 12 *
+row_regex:.*02:SCAN HDFS * 3 * 4 .*alltypestiny.*
+row_regex:.*01:SCAN HDFS * 3 * 12 .*alltypes.*
 ====
 ---- QUERY
 # This query should have one scan and two exchanges in the interior fragment.
@@ -123,11 +123,11 @@ select min(string_col) from (
 ---- RESULTS
 '0'
 ---- RUNTIME_PROFILE
-row_regex:.*F06:ROOT * 1 .*
+row_regex:.*F06:ROOT * 1 * 1 .*
 row_regex:.*AdmissionSlots: 2.*
-row_regex:.*00:UNION * 6 .*
-row_regex:.*08:AGGREGATE * 6 .*
-row_regex:.*03:AGGREGATE * 6 .*
-row_regex:.*04:SCAN HDFS * 6 .*
-row_regex:.*01:SCAN HDFS * 6 .*
+row_regex:.*00:UNION * 3 * 6 .*
+row_regex:.*08:AGGREGATE * 3 * 6 .*
+row_regex:.*03:AGGREGATE * 3 * 6 .*
+row_regex:.*04:SCAN HDFS * 3 * 6 .*
+row_regex:.*01:SCAN HDFS * 3 * 6 .*
 ====
diff --git a/tests/benchmark/report_benchmark_results.py b/tests/benchmark/report_benchmark_results.py
index 5312f01..0274e64 100755
--- a/tests/benchmark/report_benchmark_results.py
+++ b/tests/benchmark/report_benchmark_results.py
@@ -66,6 +66,7 @@ MAX_TIME = 'max_time'
 NAME = 'name'
 NUM_CLIENTS = 'num_clients'
 NUM_HOSTS = 'num_hosts'
+NUM_INSTANCES = 'num_instances'
 NUM_ROWS = 'num_rows'
 OPERATOR = 'operator'
 PEAK_MEM = 'peak_mem'
@@ -687,6 +688,7 @@ class CombinedExecSummaries(object):
     table = prettytable.PrettyTable(
         ["Operator",
           "#Hosts",
+          "#Inst"
           "Avg Time",
           "Std Dev",
           "Max Time",
@@ -698,6 +700,7 @@ class CombinedExecSummaries(object):
     for row in self.rows:
       table_row = [ row[PREFIX] + row[OPERATOR],
           prettyprint_values(row[NUM_HOSTS]),
+          prettyprint_values(row[NUM_INSTANCES]),
           prettyprint_time(row[AVG_TIME]),
           prettyprint_time(row[STDDEV_TIME]),
           prettyprint_time(row[MAX_TIME]),
@@ -805,7 +808,7 @@ class ExecSummaryComparison(object):
         ref_row = self.ref_combined_summary.rows[i]
 
         comparison_row = {}
-        for key in [PREFIX, OPERATOR, NUM_HOSTS, AVG_TIME, STDDEV_TIME,
+        for key in [PREFIX, OPERATOR, NUM_HOSTS, NUM_INSTANCES, AVG_TIME, STDDEV_TIME,
             MAX_TIME, PEAK_MEM, NUM_ROWS, EST_NUM_ROWS, EST_PEAK_MEM, DETAIL]:
           comparison_row[key] = row[key]
 
@@ -859,6 +862,7 @@ class ExecSummaryComparison(object):
             comparison_row[RSTD], comparison_row[REF_RSTD])
 
         comparison_row[NUM_HOSTS] = row[NUM_HOSTS]
+        comparison_row[NUM_INSTANCES] = row[NUM_INSTANCES]
         comparison_row[NUM_ROWS] = row[NUM_ROWS]
         comparison_row[EST_NUM_ROWS] = row[EST_NUM_ROWS]
 
@@ -891,6 +895,7 @@ class ExecSummaryComparison(object):
           'Base StdDev(%)',
           'Delta(StdDev(%))',
           '#Hosts',
+          '#Inst',
           '#Rows',
           'Est #Rows'])
     table.align = 'l'
@@ -903,6 +908,7 @@ class ExecSummaryComparison(object):
           '{0:.2%}'.format(row[REF_RSTD]),
           '{0:+.2%}'.format(row[DELTA_RSTD]),
           prettyprint_values(row[NUM_HOSTS]),
+          prettyprint_values(row[NUM_INSTANCES]),
           prettyprint_values(row[NUM_ROWS]),
           prettyprint_values(row[EST_NUM_ROWS]) ]
 
@@ -943,6 +949,7 @@ class ExecSummaryComparison(object):
           'Base Max',
           'Delta(Max)',
           '#Hosts',
+          '#Inst',
           '#Rows',
           'Est #Rows'])
     table.align = 'l'
@@ -961,6 +968,7 @@ class ExecSummaryComparison(object):
             prettyprint_time(row[BASELINE_MAX]),
             prettyprint_percent(row[DELTA_MAX]),
             prettyprint_values(row[NUM_HOSTS]),
+            prettyprint_values(row[NUM_INSTANCES]),
             prettyprint_values(row[NUM_ROWS]),
             prettyprint_values(row[EST_NUM_ROWS])]
 
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index 718d934..3090cf7 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -327,7 +327,7 @@ class TestImpalaShell(ImpalaTestSuite):
     args = ['-p', '-q', 'select 1; profile;']
     result_set = run_impala_shell_cmd(vector, args)
     # This regex helps us uniquely identify a profile.
-    regex = re.compile("Operator\s+#Hosts\s+Avg\s+Time")
+    regex = re.compile("Operator\s+#Hosts\s+#Inst\s+Avg\s+Time")
     # We expect two query profiles.
     assert len(re.findall(regex, result_set.stdout)) == 2, \
         "Could not detect two profiles, stdout: %s" % result_set.stdout