You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/09/25 22:30:23 UTC

[impala] 01/03: IMPALA-7637: Add more hash table stats to profile

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 803323bb39aee3a3bd38e7cca544cef72500b8b6
Author: Yongzhi Chen <yc...@cloudera.com>
AuthorDate: Fri Sep 13 12:13:04 2019 -0400

    IMPALA-7637: Add more hash table stats to profile
    
    Add hash table counters(probes, travel and resizes) to profile.
    Put hash table stats into the child profile "hash table".
    
    Tests:
    Add new test test_query_profle_hashtable.
    Ran exhaustive tests.
    
    Profile Sample:
      Hash Join Builder (join_node_id=2):
            ...
            Runtime filters: 1 of 1 Runtime Filter Published
            - BuildRowsPartitionTime: 157.960us
            - BuildRowsPartitioned: 100 (100)
            - HashTablesBuildTime: 298.817us
            - LargestPartitionPercent: 7 (7)
            - MaxPartitionLevel: 0 (0)
            - NumRepartitions: 0 (0)
            - PartitionsCreated: 16 (16)
            - PeakMemoryUsage: 17.12 KB (17536)
            - RepartitionTime: 0.000ns
            - SpilledPartitions: 0 (0)
            Hash Table:
            - HashBuckets: 256 (256)
            - HashCollisions: 0 (0)
            - Probes: 2.52K (2520)
            - Resizes: 0 (0)
            - Travel: 1.79K (178
    
    Change-Id: I1fd875dd1af8031242fd5f5ff554d3a71aaa6f87
    Reviewed-on: http://gerrit.cloudera.org:8080/14234
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/grouping-aggregator-partition.cc |  5 +++-
 be/src/exec/grouping-aggregator.cc           |  5 ++--
 be/src/exec/grouping-aggregator.h            |  4 +--
 be/src/exec/hash-table.cc                    | 25 +++++++++++++++++
 be/src/exec/hash-table.h                     | 40 +++++++++++++++++++++++++---
 be/src/exec/partitioned-hash-join-builder.cc | 10 +++----
 be/src/exec/partitioned-hash-join-builder.h  | 11 ++++----
 tests/query_test/test_observability.py       | 28 +++++++++++++++++++
 8 files changed, 108 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index 03f54d8..10b24c5 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -214,7 +214,10 @@ void GroupingAggregator::Partition::Close(bool finalize_rows) {
     }
     aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
-  if (hash_tbl.get() != nullptr) hash_tbl->Close();
+  if (hash_tbl.get() != nullptr) {
+    hash_tbl->StatsCountersAdd(parent->ht_stats_profile_.get());
+    hash_tbl->Close();
+  }
   if (unaggregated_row_stream.get() != nullptr) {
     unaggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 5238c43..d94604c 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -134,7 +134,7 @@ Status GroupingAggregator::Prepare(RuntimeState* state) {
 
   ht_resize_timer_ = ADD_TIMER(runtime_profile(), "HTResizeTime");
   get_results_timer_ = ADD_TIMER(runtime_profile(), "GetResultsTime");
-  num_hash_buckets_ = ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
+  ht_stats_profile_ = HashTable::AddHashTableCounters(runtime_profile());
   partitions_created_ = ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
   largest_partition_percent_ =
       runtime_profile()->AddHighWaterMarkCounter("LargestPartitionPercent", TUnit::UNIT);
@@ -722,7 +722,8 @@ Status GroupingAggregator::NextPartition() {
 
   output_partition_ = partition;
   output_iterator_ = output_partition_->hash_tbl->Begin(ht_ctx_.get());
-  COUNTER_ADD(num_hash_buckets_, output_partition_->hash_tbl->num_buckets());
+  COUNTER_ADD(this->ht_stats_profile_->num_hash_buckets_,
+      output_partition_->hash_tbl->num_buckets());
   return Status::OK();
 }
 
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index dea7e3f..bdcae14 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -245,8 +245,8 @@ class GroupingAggregator : public Aggregator {
   /// Time spent returning the aggregated rows
   RuntimeProfile::Counter* get_results_timer_ = nullptr;
 
-  /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_ = nullptr;
+  /// Counters and profile objects for HashTable stats
+  std::unique_ptr<HashTableStatsProfile> ht_stats_profile_;
 
   /// Total number of partitions created.
   RuntimeProfile::Counter* partitions_created_ = nullptr;
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 556434b..6ea5fdf 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -416,6 +416,24 @@ Status HashTable::Init(bool* got_memory) {
   return Status::OK();
 }
 
+unique_ptr<HashTableStatsProfile> HashTable::AddHashTableCounters(
+    RuntimeProfile* parent_profile) {
+  unique_ptr<HashTableStatsProfile> stats_profile(new HashTableStatsProfile());
+  RuntimeProfile *hashtable_profile = stats_profile->hashtable_profile =
+      parent_profile->CreateChild("Hash Table", true, true);
+  stats_profile->num_hash_probes_ =
+      ADD_COUNTER(hashtable_profile, "Probes", TUnit::UNIT);
+  stats_profile->num_hash_travels_ =
+      ADD_COUNTER(hashtable_profile, "Travel", TUnit::UNIT);
+  stats_profile->num_hash_collisions_ =
+      ADD_COUNTER(hashtable_profile, "HashCollisions", TUnit::UNIT);
+  stats_profile->num_hash_buckets_ =
+      ADD_COUNTER(hashtable_profile, "HashBuckets", TUnit::UNIT);
+  stats_profile->num_hash_resizes_ =
+      ADD_COUNTER(hashtable_profile, "Resizes", TUnit::UNIT);
+  return stats_profile;
+}
+
 void HashTable::Close() {
   // Print statistics only for the large or heavily used hash tables.
   // TODO: Tweak these numbers/conditions, or print them always?
@@ -428,6 +446,13 @@ void HashTable::Close() {
   if (bucket_allocation_ != nullptr) allocator_->Free(move(bucket_allocation_));
 }
 
+void HashTable::StatsCountersAdd(HashTableStatsProfile* profile) {
+  COUNTER_ADD(profile->num_hash_collisions_, num_hash_collisions_);
+  COUNTER_ADD(profile->num_hash_probes_, num_probes_);
+  COUNTER_ADD(profile->num_hash_travels_, travel_length_);
+  COUNTER_ADD(profile->num_hash_resizes_, this->num_resizes_);
+}
+
 Status HashTable::CheckAndResize(
     uint64_t buckets_to_fill, const HashTableCtx* ht_ctx, bool* got_memory) {
   uint64_t shift = 0;
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index b4a6905..c4df52c 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -34,6 +34,7 @@
 #include "runtime/tuple-row.h"
 #include "util/bitmap.h"
 #include "util/hash-util.h"
+#include "util/runtime-profile.h"
 
 namespace llvm {
   class Function;
@@ -528,6 +529,30 @@ class HashTableCtx {
   MemPool* probe_expr_results_pool_;
 };
 
+/// HashTableStatsProfile encapsulates hash tables stats. It tracks the stats of all the
+/// hash tables created by a node. It should be created, stored by the node, and be
+/// released when the node is released.
+struct HashTableStatsProfile {
+  /// Profile object for HashTable Stats
+  RuntimeProfile* hashtable_profile = nullptr;
+
+  /// Number of hash collisions - unequal rows that have identical hash values
+  RuntimeProfile::Counter* num_hash_collisions_ = nullptr;
+
+  /// Number of hash table probes.
+  RuntimeProfile::Counter* num_hash_probes_ = nullptr;
+
+  /// Total distance traveled for each hash table probe.
+  RuntimeProfile::Counter* num_hash_travels_ = nullptr;
+
+  /// Number of hash table resized
+  RuntimeProfile::Counter* num_hash_resizes_ = nullptr;
+
+  /// Total number of hash buckets across all partitions.
+  RuntimeProfile::Counter* num_hash_buckets_ = nullptr;
+
+};
+
 /// The hash table consists of a contiguous array of buckets that contain a pointer to the
 /// data, the hash value and three flags: whether this bucket is filled, whether this
 /// entry has been matched (used in right and full joins) and whether this entry has
@@ -618,9 +643,21 @@ class HashTable {
   /// enough memory for the initial buckets was allocated from the Suballocator.
   Status Init(bool* got_memory) WARN_UNUSED_RESULT;
 
+  /// Create the counters for HashTable stats and put them into the child profile
+  /// "Hash Table".
+  /// Returns a HashTableStatsProfile object.
+  static std::unique_ptr<HashTableStatsProfile> AddHashTableCounters(
+      RuntimeProfile* parent_profile);
+
   /// Call to cleanup any resources. Must be called once.
   void Close();
 
+  /// Add operations stats of this hash table to the counters in profile.
+  /// This method should only be called once for each HashTable and be called during
+  /// closing the owner object of the HashTable. Not all the counters are added with the
+  /// method, only counters for Probes, travels, collisions and resizes are affected.
+  void StatsCountersAdd(HashTableStatsProfile* profile);
+
   /// Inserts the row to the hash table. The caller is responsible for ensuring that the
   /// table has free buckets. Returns true if the insertion was successful. Always
   /// returns true if the table has free buckets and the key is not a duplicate. If the
@@ -737,9 +774,6 @@ class HashTable {
   /// Update and print some statistics that can be used for performance debugging.
   std::string PrintStats() const;
 
-  /// Number of hash collisions so far in the lifetime of this object
-  int64_t NumHashCollisions() const { return num_hash_collisions_; }
-
   /// stl-like iterator interface.
   class Iterator {
    private:
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 175d7d8..2f9017d 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -71,8 +71,6 @@ PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label,
     largest_partition_percent_(NULL),
     max_partition_level_(NULL),
     num_build_rows_partitioned_(NULL),
-    num_hash_collisions_(NULL),
-    num_hash_buckets_(NULL),
     num_spilled_partitions_(NULL),
     num_repartitions_(NULL),
     partition_build_rows_timer_(NULL),
@@ -133,8 +131,7 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker)
       profile()->AddHighWaterMarkCounter("MaxPartitionLevel", TUnit::UNIT);
   num_build_rows_partitioned_ =
       ADD_COUNTER(profile(), "BuildRowsPartitioned", TUnit::UNIT);
-  num_hash_collisions_ = ADD_COUNTER(profile(), "HashCollisions", TUnit::UNIT);
-  num_hash_buckets_ = ADD_COUNTER(profile(), "HashBuckets", TUnit::UNIT);
+  ht_stats_profile_ = HashTable::AddHashTableCounters(profile());
   num_spilled_partitions_ = ADD_COUNTER(profile(), "SpilledPartitions", TUnit::UNIT);
   num_repartitions_ = ADD_COUNTER(profile(), "NumRepartitions", TUnit::UNIT);
   partition_build_rows_timer_ = ADD_TIMER(profile(), "BuildRowsPartitionTime");
@@ -609,7 +606,7 @@ void PhjBuilder::Partition::Close(RowBatch* batch) {
   if (IsClosed()) return;
 
   if (hash_tbl_ != NULL) {
-    COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
+    hash_tbl_->StatsCountersAdd(parent_->ht_stats_profile_.get());
     hash_tbl_->Close();
   }
 
@@ -711,7 +708,8 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
   DCHECK(*built);
   DCHECK(hash_tbl_ != NULL);
   is_spilled_ = false;
-  COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
+  COUNTER_ADD(parent_->ht_stats_profile_->num_hash_buckets_,
+      hash_tbl_->num_buckets());
   return Status::OK();
 
 not_built:
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 9acc7a7..bbdc472 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -133,6 +133,8 @@ class PhjBuilder : public DataSink {
   /// depends on the join type and the equijoin conjuncts.
   bool HashTableStoresNulls() const;
 
+  void AddHashTableStatsToProfile(RuntimeProfile* profile);
+
   /// Accessor functions, mainly required to expose state to PartitionedHashJoinNode.
   inline bool non_empty_build() const { return non_empty_build_; }
   inline const std::vector<bool>& is_not_distinct_from() const {
@@ -423,6 +425,9 @@ class PhjBuilder : public DataSink {
   /// The level is set to the same level as 'hash_partitions_'.
   boost::scoped_ptr<HashTableCtx> ht_ctx_;
 
+  /// Counters and profile objects for HashTable stats
+  std::unique_ptr<HashTableStatsProfile> ht_stats_profile_;
+
   /// Total number of partitions created.
   RuntimeProfile::Counter* partitions_created_;
 
@@ -436,12 +441,6 @@ class PhjBuilder : public DataSink {
   /// Number of build rows that have been partitioned.
   RuntimeProfile::Counter* num_build_rows_partitioned_;
 
-  /// Number of hash collisions - unequal rows that have identical hash values
-  RuntimeProfile::Counter* num_hash_collisions_;
-
-  /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_;
-
   /// Number of partitions that have been spilled.
   RuntimeProfile::Counter* num_spilled_partitions_;
 
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 09674fd..463f7a5 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -709,3 +709,31 @@ class TestObservability(ImpalaTestSuite):
     # Call the second time, no metastore loading needed.
     runtime_profile = self.execute_query(query).runtime_profile
     assert storageLoadTime not in runtime_profile
+
+  def __verify_hashtable_stats_profile(self, runtime_profile):
+    assert "Hash Table" in runtime_profile
+    assert "Probes:" in runtime_profile
+    assert "Travel:" in runtime_profile
+    assert "HashCollisions:" in runtime_profile
+    assert "Resizes:" in runtime_profile
+    nprobes = re.search('Probes:.*\((\d+)\)', runtime_profile)
+    # Probes and travel can be 0. The number can be an integer or float with K.
+    assert nprobes and len(nprobes.groups()) == 1 and nprobes.group(1) >= 0
+    ntravel = re.search('Travel:.*\((\d+)\)', runtime_profile)
+    assert ntravel and len(ntravel.groups()) == 1 and ntravel.group(1) >= 0
+
+  def test_query_profle_hashtable(self):
+    """Test that the profile for join/aggregate contains hash table related
+    information."""
+    # Join
+    query = """select a.int_col, a.string_col from functional.alltypes a
+        inner join functional.alltypessmall b on a.id = b.id"""
+    result = self.execute_query(query)
+    assert result.success
+    self.__verify_hashtable_stats_profile(result.runtime_profile)
+    # Group by
+    query = """select year, count(*) from
+        functional.alltypesagg where int_col < 7 and year = 2010 group by year"""
+    result = self.execute_query(query)
+    assert result.success
+    self.__verify_hashtable_stats_profile(result.runtime_profile)