You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/10/21 01:57:41 UTC

[impala] branch master updated: IMPALA-10178 Run-time profile shall report skews

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 65a0325  IMPALA-10178 Run-time profile shall report skews
65a0325 is described below

commit 65a0325572a5a66e4ed017d291145f567f02419c
Author: Qifan Chen <qc...@cloudera.com>
AuthorDate: Fri Sep 18 16:56:17 2020 -0400

    IMPALA-10178 Run-time profile shall report skews
    
    This fix addresses the current limitation in runtime profile that
    skews existing in certain operators such as the rows read counter
    (RowsRead) in the scan operators are not reported. A skew condition
    exists when the number of rows processed at each operator instance
    is not about the same and can be detected through coefficient of variation
    (CoV). A high CoV (say > 1.0) usually implies the existence of
    skew.
    
    With the fix, such skew is detected for the following counters
      1. RowsRead in HDFS_SCAN_NODE and KUDU_SCAN_NODE
      2. ProbeRows and BuildRows in HASH_JOIN_NODE
      3. RowsReturned in GroupingAggregator, EXCHANGE and SORT_NODE
    
    and reported as follows:
      1. In execution profile, a new skew summary that lists the names
         of the operators with skews;
      2. In the averaged profile for the corresponding operator, the list
         of values of the counter across all fragment instances in the
         backend processes;
      3. Skew detection formula: CoV > limit and mean > 5,000
      4. A new query option 'report_skew_limit'
         < 0: disable skew reporting
         >= 0: enable skew reporting and supply the CoV limit
    
    Examples of skews reported for a hash join and an hdfs scan.
    
    In execution profile:
    
      ... ...
      skew(s) found at: HASH_JOIN_NODE (id=4), HDFS_SCAN_NODE (id=0)
    
      Per Node Peak Memory Usage: ...
      ... ...
    
    In averaged profiles:
    
      HDFS_SCAN_NODE (id=2): ...
              Skew details: RowsRead ([2004992,1724693,2001351],
                                      CoV=0.07, mean=1910345)
    
    Testing:
    1. Added test_skew_reporting_in_runtime_profile in
       test_observability.py to verify that the skews are reported.
    2. Ran Core tests successfully.
    
    Change-Id: I91041f2856eef8293ea78f1721f97469062589a1
    Reviewed-on: http://gerrit.cloudera.org:8080/16474
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc              |   6 ++
 be/src/service/query-options.cc            |  13 ++++
 be/src/service/query-options.h             |   4 +-
 be/src/util/runtime-profile-counters.h     |  38 ++++++++++
 be/src/util/runtime-profile.cc             | 107 +++++++++++++++++++++++++++++
 be/src/util/runtime-profile.h              |  17 +++++
 be/src/util/stat-util.h                    |  32 ++++++---
 common/thrift/ImpalaInternalService.thrift |   3 +
 common/thrift/ImpalaService.thrift         |   4 ++
 tests/query_test/test_hash_join_timer.py   |   4 ++
 tests/query_test/test_observability.py     |  31 ++++++++-
 11 files changed, 248 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4dedd92..38c988a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -1219,6 +1219,12 @@ void Coordinator::ComputeQuerySummary() {
   COUNTER_SET(PROFILE_InnerNodeSelectivityRatio.Instantiate(query_profile_),
       inner_node_ratio);
 
+  double skew_threshold = query_state_->query_options().report_skew_limit;
+  if (skew_threshold >= 0) {
+    // Add skews info (if any)
+    query_profile_->AddSkewInfo(query_profile_, skew_threshold);
+  }
+
   // TODO(IMPALA-8126): Move to host profiles
   query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
   query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 63e1b73..900fe76 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -972,6 +972,19 @@ Status impala::SetQueryOption(const string& key, const string& value,
               query_options->__set_targeted_kudu_scan_range_length(scan_length);
               break;
       }
+      case TImpalaQueryOptions::REPORT_SKEW_LIMIT: {
+        StringParser::ParseResult result;
+        const double skew_threshold =
+            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS) {
+          return Status(
+              Substitute("$0 is not valid for REPORT_SKEW_LIMIT. Only numeric "
+                         "values (such as 1.0) are allowed.",
+                  value));
+        }
+        query_options->__set_report_skew_limit(skew_threshold);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 3b13f3b..ade7950 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::TARGETED_KUDU_SCAN_RANGE_LENGTH + 1);\
+      TImpalaQueryOptions::REPORT_SKEW_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -221,6 +221,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(report_skew_limit, REPORT_SKEW_LIMIT,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 472ef16..7f05763 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -30,6 +30,7 @@
 #include "common/logging.h"
 #include "gutil/singleton.h"
 #include "util/arithmetic-util.h"
+#include "util/stat-util.h"
 #include "util/runtime-profile.h"
 #include "util/stopwatch.h"
 #include "util/streaming-sampler.h"
@@ -409,6 +410,21 @@ class RuntimeProfileBase::AveragedCounter : public RuntimeProfileBase::Counter {
   /// It is safe for it to be read at the same time as it is updated.
   void UpdateCounter(Counter* new_counter, int idx);
 
+  /// Answer the question whether there exists skew among all valid raw values
+  /// backing this average counter.
+  ///
+  ///  Input argument threshold: the threshold used to evaluate skews.
+  ///
+  /// Return true if skew is detected and 'details' is populated with a list of
+  /// all valid raw values and the population stddev in the form of:
+  /// ([raw_value, raw_value ...], <skew-detection-formula>).
+  ///  <skew-detection-formula> ::=
+  ///      CoV=coefficient_of_variation_value, mean=mean_value
+  ///
+  /// Return false if no skew is detected and 'details' argument is not altered.
+  ///
+  bool HasSkew(double threshold, std::string* details);
+
   /// The value for this counter should be updated through UpdateCounter().
   /// Set() and Add() should not be used.
   void Set(double value) override { DCHECK(false); }
@@ -465,6 +481,28 @@ class RuntimeProfileBase::AveragedCounter : public RuntimeProfileBase::Counter {
   /// Returns the mean value, or the double bit pattern stored in an int64_t.
   template <typename T>
   int64_t ComputeMean() const;
+
+  /// Decide whether skew exists among all valid raw values V in this counter with the
+  /// following formula:
+  ///
+  ///   CoV(V) > threshold && mean(V) > 5000
+  ///
+  /// where CoV(V) is the coefficient of variation, defined as
+  /// stddev_population(V) / mean(V). CoV measures the variability in relation to the
+  /// mean. When values in V are identical (no skew), CoV(V) = 0. When values in V
+  /// differ, CoV(V) > 0. The above formula excludes small skew casess when the average
+  /// raw value is no greater than 5000.
+  ///
+  /// Returns true if the above formula is evaluated to true, false otherwise.
+  ///
+  static const int ROW_AVERAGE_LIMIT = 5000;
+  template <typename T>
+  bool EvaluateSkewWithCoV(double threshold, std::stringstream* details);
+
+  /// Compute the population stddev from all valid values (of type T) in
+  /// values_[] and collect these values in a comma separated list through 'details'.
+  template <typename T>
+  void ComputeStddevPForValidValues(std::string* details, double* stddev);
 };
 
 /// This counter records multiple values and keeps a track of the minimum, maximum and
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 798a4a3..3d0e1f7 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -27,6 +27,7 @@
 
 #include <boost/algorithm/string/join.hpp>
 #include <boost/bind.hpp>
+#include <boost/range/adaptor/transformed.hpp>
 
 #include "common/object-pool.h"
 #include "gutil/strings/strip.h"
@@ -71,6 +72,9 @@ const string RuntimeProfileBase::TOTAL_TIME_COUNTER_NAME = "TotalTime";
 const string RuntimeProfileBase::LOCAL_TIME_COUNTER_NAME = "LocalTime";
 const string RuntimeProfileBase::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime";
 
+const string RuntimeProfileBase::SKEW_SUMMARY = "skew(s) found at";
+const string RuntimeProfileBase::SKEW_DETAILS = "Skew details";
+
 constexpr ProfileEntryPrototype::Significance ProfileEntryPrototype::ALLSIGNIFICANCE[];
 
 /// Helper to interpret the bit pattern of 'val' as T, which can either be an int64_t or
@@ -665,6 +669,50 @@ int RuntimeProfileBase::num_counters() const {
   return counter_map_.size();
 }
 
+void RuntimeProfileBase::AddSkewInfo(RuntimeProfileBase* root, double threshold) {
+  lock_guard<SpinLock> l(children_lock_);
+  DCHECK(root);
+  // A map of specs for profiles and average counters in profiles that can potentially
+  // harbor skews. Each spec is a pair of a profile name prefix and a list of counter
+  // names. Lookup of the map for a profile name prefix is O(1).
+  typedef string NodeNamePrefix;
+  typedef vector<string> CounterNames;
+  static const unordered_map<NodeNamePrefix, CounterNames> skew_profile_specs = {
+      {"KUDU_SCAN_NODE", {"RowsRead"}}, {"HDFS_SCAN_NODE", {"RowsRead"}},
+      {"HASH_JOIN_NODE", {"ProbeRows", "BuildRows"}},
+      {"GroupingAggregator", {"RowsReturned"}}, {"EXCHANGE_NODE", {"RowsReturned"}},
+      {"SORT_NODE", {"RowsReturned"}}};
+  // If this profile can potentially harbor skews, identify the corresponding counters
+  // within it and verify.
+  for (auto& skew_profile_it : skew_profile_specs) {
+    // Test if the profile name prefix is indeed a prefix of the profile name.
+    if (name().find(skew_profile_it.first) == 0) {
+      for (auto& counter_name_it : skew_profile_it.second) {
+        auto counter_it = counter_map_.find(counter_name_it);
+        // Test if the counter name is in the counter spec.
+        if (counter_it != counter_map_.end()) {
+          auto average_counter =
+              dynamic_cast<RuntimeProfile::AveragedCounter*>(counter_it->second);
+          string details;
+          // Skew detection can only be done for an average counter.
+          if (average_counter && average_counter->HasSkew(threshold, &details)) {
+            // Skew detected:
+            // Log the profile name in the 'root' profile
+            root->AddInfoStringInternal(SKEW_SUMMARY, name(), true);
+            // Log the counter name and skew details in 'this' profile
+            this->AddInfoStringInternal(
+                SKEW_DETAILS, counter_name_it + " " + details, true);
+          }
+        }
+      }
+    }
+  }
+  // Keep looking from within each child profile.
+  for (auto child : children_) {
+    child.first->AddSkewInfo(root, threshold);
+  }
+}
+
 void RuntimeProfile::SortChildrenByTotalTime() {
   lock_guard<SpinLock> l(children_lock_);
   // Create a snapshot of total time values so that they don't change while we're
@@ -1859,6 +1907,65 @@ void RuntimeProfileBase::AveragedCounter::ToThrift(
   }
 }
 
+bool RuntimeProfile::AveragedCounter::HasSkew(double threshold, string* details) {
+  bool has_skew = false;
+  stringstream ss_details;
+  if (unit_ == TUnit::DOUBLE_VALUE) {
+    has_skew = EvaluateSkewWithCoV<double>(threshold, &ss_details);
+  } else {
+    has_skew = EvaluateSkewWithCoV<int64_t>(threshold, &ss_details);
+  }
+  if (has_skew && details) {
+    *details = ss_details.str();
+    return true;
+  }
+  return false;
+}
+
+template <typename T>
+bool RuntimeProfile::AveragedCounter::EvaluateSkewWithCoV(
+    double threshold, stringstream* details) {
+  T mean = value();
+  if (mean > ROW_AVERAGE_LIMIT) {
+    double stddev = 0.0;
+    string valid_value_list;
+    ComputeStddevPForValidValues<T>(&valid_value_list, &stddev);
+    double cov = stddev / mean;
+    if (cov > threshold) {
+      DCHECK(details);
+      *details << "(" << valid_value_list << ", CoV=" << std::fixed
+               << std::setprecision(2) << cov << ", mean=" << std::fixed
+               << std::setprecision(2) << mean << ")";
+      return true;
+    }
+  }
+  return false;
+}
+
+template <typename T>
+void RuntimeProfile::AveragedCounter::ComputeStddevPForValidValues(
+    string* valid_value_list, double* stddev) {
+  DCHECK(valid_value_list);
+  DCHECK(stddev);
+  // Collect raw values.
+  vector<T> values;
+  stringstream ss;
+  ss << "[";
+  for (int i = 0; i < num_values_; i++) {
+    if (has_value_[i].Load()) {
+      T v = BitcastFromInt64<T>(values_[i].Load());
+      values.emplace_back(v);
+    }
+  }
+  ss << boost::algorithm::join(
+      values | boost::adaptors::transformed([](T d) { return std::to_string(d); }), ",");
+  ss << "]";
+  *valid_value_list = ss.str();
+  T mean = value();
+  // Finally compute the population stddev.
+  StatUtil::ComputeStddevP<T>(values.data(), values.size(), mean, stddev);
+}
+
 void RuntimeProfileBase::SummaryStatsCounter::ToThrift(
     TSummaryStatsCounter* counter, const std::string& name) {
   lock_guard<SpinLock> l(lock_);
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index db8f6e6..905c1ce 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -199,6 +199,16 @@ class RuntimeProfileBase {
   /// Always 1 for non-aggregated profiles.
   virtual int GetNumInputProfiles() const = 0;
 
+  /// Identify skews in certain counters for certain operators from children_ profiles
+  //  recursively. Add the skew info (if identified) as follows.
+  ///  1. a list of profile names to the 'root' profile with key SKEW_SUMMARY;
+  ///  2. a list of counter names plus the details in those child profiles with key
+  ///     SKEW_DETAILS.
+  static const std::string SKEW_SUMMARY;
+  static const std::string SKEW_DETAILS;
+  /// Argument 'threshold' provides the threshold used in the formula to detect skew.
+  void AddSkewInfo(RuntimeProfileBase* root, double threshold);
+
  protected:
   /// Name of the counter maintaining the total time.
   static const std::string TOTAL_TIME_COUNTER_NAME;
@@ -404,6 +414,13 @@ class RuntimeProfile : public RuntimeProfileBase {
   /// invalidate pointers to profiles.
   void SortChildrenByTotalTime();
 
+  /// Updates the AveragedCounter counters in this profile with the counters from the
+  /// 'src' profile. If a counter is present in 'src' but missing in this profile, a new
+  /// AveragedCounter is created with the same name. This method should not be invoked
+  /// if is_average_profile_ is false. Obtains locks on the counter maps and child counter
+  /// maps in both this and 'src' profiles.
+  void UpdateAverage(RuntimeProfile* src);
+
   /// Updates this profile w/ the thrift profile.
   /// Counters and child profiles in thrift_profile that already exist in this profile
   /// are updated. Counters that do not already exist are created.
diff --git a/be/src/util/stat-util.h b/be/src/util/stat-util.h
index 5b932eb..a88e7a1 100644
--- a/be/src/util/stat-util.h
+++ b/be/src/util/stat-util.h
@@ -25,25 +25,39 @@ namespace impala {
 
 class StatUtil {
  public:
-  /// Computes mean and standard deviation
+  /// Computes the standard deviation (population) from an array of values in 'values'.
+  /// The total number of such values is 'N' and the mean is already computed in 'mean'.
+  /// On return:
+  ///  *stddev is the stddev
   template <typename T>
-  static void ComputeMeanStddev(const T* values, int N, double* mean, double* stddev) {
-    *mean = 0;
+  static void ComputeStddevP(const T* values, int N, T mean, double* stddev) {
     *stddev = 0;
 
     for (int i = 0; i < N; ++i) {
-      *mean += values[i];
-    }
-    *mean /= N;
-
-    for (int i = 0; i < N; ++i) {
-      double d = values[i] - *mean;
+      double d = values[i] - mean;
       *stddev += d*d;
     }
 
     *stddev /= N;
     *stddev = sqrt(*stddev);
   }
+
+  /// Computes the mean and the standard deviation (population) from an array of
+  /// values in 'values'. The total number of such values is 'N'.
+  /// On return:
+  ///  *mean is the mean
+  ///  *stddev is the stddev
+  template <typename T>
+  static void ComputeMeanStddevP(const T* values, int N, T* mean, double* stddev) {
+    *mean = 0;
+
+    for (int i = 0; i < N; ++i) {
+      *mean += values[i];
+    }
+    *mean /= N;
+
+    ComputeStddev(values, N, *mean, stddev);
+  }
 };
 
 }
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bb22ab3..9f7aade 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -460,6 +460,9 @@ struct TQueryOptions {
   // Initialized with -1 to indicate it is unspecified.
   // See comment in ImpalaService.thrift
   115: optional i64 targeted_kudu_scan_range_length = -1;
+
+  // See comment in ImpalaService.thrift
+  116: optional double report_skew_limit = 1.0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 3e5e5af..7a18523 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -590,6 +590,10 @@ enum TImpalaQueryOptions {
   // does not guarantee a limit on the size of the scan range. If unspecified or
   // set to 0 disables this feature.
   TARGETED_KUDU_SCAN_RANGE_LENGTH = 114
+
+  // Enable (>=0) or disable(<0) reporting of skews for a query in runtime profile.
+  // When enabled, used as the CoV threshold value in the skew detection formula.
+  REPORT_SKEW_LIMIT = 115
 }
 
 // The summary of a DML statement.
diff --git a/tests/query_test/test_hash_join_timer.py b/tests/query_test/test_hash_join_timer.py
index 24b4cc0..a6e408e 100644
--- a/tests/query_test/test_hash_join_timer.py
+++ b/tests/query_test/test_hash_join_timer.py
@@ -135,6 +135,10 @@ class TestHashJoinTimer(ImpalaTestSuite):
     check_fragment_count = 0
     asyn_build = False
     for line in profile.split("\n"):
+      if ("skew(s)" in line):
+        # Sample line:
+        # skew(s) found at: HASH_JOIN_NODE (id=3), EXCHANGE_NODE (id=8)
+        continue
       if ("(id=3)" in line):
         # Sample line:
         # HASH_JOIN_NODE (id=3):(Total: 3s580ms, non-child: 11.89ms, % non-child: 0.31%)
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index fe5d2f8..f5f1e7e 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -241,10 +241,12 @@ class TestObservability(ImpalaTestSuite):
     """IMPALA-6081: Test that the expected number of fragment instances and their exec
     nodes appear in the runtime profile, even when fragments may be quickly cancelled when
     all results are already returned."""
+    query_opts = {'report_skew_limit': -1}
     results = self.execute_query("""
         with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
         select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
-        join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""")
+        join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""",
+        query_opts)
     # There are 3 scan nodes and each appears in the profile n+1 times (for n fragment
     # instances + the averaged fragment). n depends on how data is loaded and scheduler's
     # decision.
@@ -730,6 +732,33 @@ class TestObservability(ImpalaTestSuite):
     assert result.success
     self.__verify_hashtable_stats_profile(result.runtime_profile)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  def test_skew_reporting_in_runtime_profile(self):
+    """Test that the skew summary and skew details are reported in runtime profile
+    correctly"""
+    query = """select ca_state, count(*) from tpcds_parquet.store_sales,
+            tpcds_parquet.customer, tpcds_parquet.customer_address
+            where ss_customer_sk = c_customer_sk and
+            c_current_addr_sk = ca_address_sk
+            group by ca_state
+            order by ca_state
+            """
+    "Set up the skew threshold to 0.02"
+    query_opts = {'report_skew_limit': 0.02}
+    results = self.execute_query(query, query_opts)
+    assert results.success
+
+    "Expect to see the skew summary"
+    skews_found = 'skew\(s\) found at:.*HASH_JOIN.*HASH_JOIN.*HDFS_SCAN_NODE'
+    assert len(re.findall(skews_found, results.runtime_profile, re.M)) == 1
+
+    "Expect to see skew details twice at the hash join nodes."
+    probe_rows_at_hj = 'HASH_JOIN_NODE.*\n.*Skew details: ProbeRows'
+    assert len(re.findall(probe_rows_at_hj, results.runtime_profile, re.M)) == 2
+
+    "Expect to see skew details once at the scan node."
+    probe_rows_at_hdfs_scan = 'HDFS_SCAN_NODE.*\n.*Skew details: RowsRead'
+    assert len(re.findall(probe_rows_at_hdfs_scan, results.runtime_profile, re.M)) == 1
 
 class TestQueryStates(ImpalaTestSuite):
   """Test that the 'Query State' and 'Impala Query State' are set correctly in the