You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/14 03:26:14 UTC

[doris] branch master updated: [improvement]Use phmap for aggregation with serialized key (#10821)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d1573e1a4a [improvement]Use phmap for aggregation with serialized key (#10821)
d1573e1a4a is described below

commit d1573e1a4a715e34e7886810e792fb47c7e0e15d
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Thu Jul 14 11:26:09 2022 +0800

    [improvement]Use phmap for aggregation with serialized key (#10821)
---
 be/src/vec/common/columns_hashing.h        |   1 +
 be/src/vec/common/columns_hashing_impl.h   |  50 +++++++++
 be/src/vec/common/hash_table/ph_hash_map.h | 170 +++++++++++++++++++++++++++++
 be/src/vec/exec/vaggregation_node.cpp      |  76 ++++++++++++-
 be/src/vec/exec/vaggregation_node.h        |   2 +-
 5 files changed, 294 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h
index 300f7d70e0..9e4ed78f4e 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -28,6 +28,7 @@
 #include "vec/common/columns_hashing_impl.h"
 #include "vec/common/hash_table/hash_table.h"
 #include "vec/common/hash_table/hash_table_key_holder.h"
+#include "vec/common/hash_table/ph_hash_map.h"
 #include "vec/common/unaligned.h"
 
 namespace doris::vectorized {
diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h
index 3ba2f630f2..8aa1b1e96d 100644
--- a/be/src/vec/common/columns_hashing_impl.h
+++ b/be/src/vec/common/columns_hashing_impl.h
@@ -132,6 +132,13 @@ public:
         return emplaceImpl(key_holder, data);
     }
 
+    template <typename Data>
+    ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row,
+                                            Arena& pool) {
+        auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool);
+        return emplaceImpl(key_holder, hash_value, data);
+    }
+
     template <typename Data>
     ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) {
         auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool);
@@ -207,6 +214,49 @@ protected:
             return EmplaceResult(inserted);
     }
 
+    template <typename Data, typename KeyHolder>
+    ALWAYS_INLINE EmplaceResult emplaceImpl(KeyHolder& key_holder, size_t hash_value, Data& data) {
+        if constexpr (Cache::consecutive_keys_optimization) {
+            if (cache.found && cache.check(key_holder_get_key(key_holder))) {
+                if constexpr (has_mapped)
+                    return EmplaceResult(cache.value.second, cache.value.second, false);
+                else
+                    return EmplaceResult(false);
+            }
+        }
+
+        typename Data::LookupResult it;
+        bool inserted = false;
+        data.emplace(key_holder, it, hash_value, inserted);
+
+        [[maybe_unused]] Mapped* cached = nullptr;
+        if constexpr (has_mapped) cached = lookup_result_get_mapped(it);
+
+        if (inserted) {
+            if constexpr (has_mapped) {
+                new (lookup_result_get_mapped(it)) Mapped();
+            }
+        }
+
+        if constexpr (consecutive_keys_optimization) {
+            cache.found = true;
+            cache.empty = false;
+
+            if constexpr (has_mapped) {
+                cache.value.first = *lookup_result_get_key(it);
+                cache.value.second = *lookup_result_get_mapped(it);
+                cached = &cache.value.second;
+            } else {
+                cache.value = *lookup_result_get_key(it);
+            }
+        }
+
+        if constexpr (has_mapped)
+            return EmplaceResult(*lookup_result_get_mapped(it), *cached, inserted);
+        else
+            return EmplaceResult(inserted);
+    }
+
     template <typename Data, typename Key>
     ALWAYS_INLINE FindResult find_key_impl(Key key, Data& data) {
         if constexpr (Cache::consecutive_keys_optimization) {
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h
new file mode 100644
index 0000000000..2883f58f17
--- /dev/null
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <parallel_hashmap/phmap.h>
+
+ALWAYS_INLINE inline char** lookup_result_get_mapped(std::pair<const StringRef, char*>* it) {
+    return &(it->second);
+}
+
+template <typename Map>
+struct IsPhmapTraits {
+    constexpr static bool value = false;
+};
+
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+class PHHashMap : private boost::noncopyable {
+public:
+    using Self = PHHashMap;
+    using HashMapImpl = phmap::flat_hash_map<Key, Mapped, Hash>;
+
+    using key_type = Key;
+    using mapped_type = Mapped;
+    using value_type = Key;
+
+    using LookupResult = typename HashMapImpl::value_type*;
+
+    using const_iterator_impl = typename HashMapImpl::const_iterator;
+    using iterator_impl = typename HashMapImpl::iterator;
+
+    template <typename Derived, bool is_const>
+    class iterator_base {
+        using BaseIterator = std::conditional_t<is_const, const_iterator_impl, iterator_impl>;
+
+        BaseIterator base_iterator;
+        friend class PHHashMap;
+
+    public:
+        iterator_base() {}
+        iterator_base(BaseIterator it) : base_iterator(it) {}
+
+        bool operator==(const iterator_base& rhs) const {
+            return base_iterator == rhs.base_iterator;
+        }
+        bool operator!=(const iterator_base& rhs) const {
+            return base_iterator != rhs.base_iterator;
+        }
+
+        Derived& operator++() {
+            base_iterator++;
+            return static_cast<Derived&>(*this);
+        }
+
+        auto& operator*() const { return *this; }
+        auto* operator->() const { return this; }
+
+        auto& operator*() { return *this; }
+        auto* operator->() { return this; }
+
+        const auto& get_first() const { return base_iterator->first; }
+
+        const auto& get_second() const { return base_iterator->second; }
+
+        auto& get_second() { return base_iterator->second; }
+
+        auto get_ptr() const { return *base_iterator; }
+        size_t get_hash() const { return base_iterator->get_hash(); }
+
+        size_t get_collision_chain_length() const { return 0; }
+    };
+
+    class iterator : public iterator_base<iterator, false> {
+    public:
+        using iterator_base<iterator, false>::iterator_base;
+    };
+
+    class const_iterator : public iterator_base<const_iterator, true> {
+    public:
+        using iterator_base<const_iterator, true>::iterator_base;
+    };
+
+    const_iterator begin() const { return const_iterator(_hash_map.cbegin()); }
+
+    const_iterator cbegin() const { return const_iterator(_hash_map.cbegin()); }
+
+    iterator begin() { return iterator(_hash_map.begin()); }
+
+    const_iterator end() const { return const_iterator(_hash_map.cend()); }
+    const_iterator cend() const { return const_iterator(_hash_map.cend()); }
+    iterator end() { return iterator(_hash_map.end()); }
+
+    template <typename KeyHolder>
+    void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted) {
+        const auto& key = key_holder_get_key(key_holder);
+        inserted = false;
+        auto it_ = _hash_map.lazy_emplace(key, [&](const auto& ctor) {
+            inserted = true;
+            key_holder_persist_key(key_holder);
+            ctor(key_holder_get_key(key_holder), nullptr);
+        });
+        it = &*it_;
+    }
+
+    template <typename KeyHolder>
+    void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value,
+                               bool& inserted) {
+        const auto& key = key_holder_get_key(key_holder);
+        inserted = false;
+        auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
+            inserted = true;
+            key_holder_persist_key(key_holder);
+            ctor(key, nullptr);
+        });
+        it = &*it_;
+    }
+
+    size_t hash(const Key& x) const { return _hash_map.hash(x); }
+
+    void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); }
+
+    /// Call func(const Key &, Mapped &) for each hash map element.
+    template <typename Func>
+    void for_each_value(Func&& func) {
+        for (auto& v : *this) func(v.get_first(), v.get_second());
+    }
+
+    /// Call func(Mapped &) for each hash map element.
+    template <typename Func>
+    void for_each_mapped(Func&& func) {
+        for (auto& v : *this) func(v.get_second());
+    }
+
+    size_t get_buffer_size_in_bytes() const {
+        const auto capacity = _hash_map.capacity();
+        return capacity * sizeof(typename HashMapImpl::slot_type);
+    }
+
+    bool add_elem_size_overflow(size_t row) const {
+        const auto capacity = _hash_map.capacity();
+        // phmap use 7/8th as maximum load factor.
+        return (_hash_map.size() + row) > (capacity * 7 / 8);
+    }
+
+    size_t size() const { return _hash_map.size(); }
+
+    char* get_null_key_data() { return nullptr; }
+    bool has_null_key_data() const { return false; }
+
+    HashMapImpl _hash_map;
+};
+
+template <typename Key, typename Mapped, typename Hash>
+struct IsPhmapTraits<PHHashMap<Key, Mapped, Hash>> {
+    constexpr static bool value = true;
+};
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index cbecc624ad..e152e7d477 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -32,6 +32,9 @@
 
 namespace doris::vectorized {
 
+// Here is an empirical value.
+static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;
+
 /// The minimum reduction factor (input rows divided by output rows) to grow hash tables
 /// in a streaming preaggregation, given that the hash tables are currently the given
 /// size or above. The sizes roughly correspond to hash table sizes where the bucket
@@ -778,17 +781,38 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
         std::visit(
                 [&](auto&& agg_method) -> void {
                     using HashMethodType = std::decay_t<decltype(agg_method)>;
+                    using HashTableType = std::decay_t<decltype(agg_method.data)>;
                     using AggState = typename HashMethodType::State;
                     AggState state(key_columns, _probe_key_sz, nullptr);
 
                     _pre_serialize_key_if_need(state, agg_method, key_columns, rows);
 
+                    std::vector<size_t> hash_values;
+
+                    if constexpr (IsPhmapTraits<HashTableType>::value) {
+                        if (hash_values.size() < rows) hash_values.resize(rows);
+                        for (size_t i = 0; i < rows; ++i) {
+                            hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                        }
+                    }
+
                     /// For all rows.
                     for (size_t i = 0; i < rows; ++i) {
                         AggregateDataPtr aggregate_data = nullptr;
 
-                        auto emplace_result =
-                                state.emplace_key(agg_method.data, i, _agg_arena_pool);
+                        auto emplace_result = [&]() {
+                            if constexpr (IsPhmapTraits<HashTableType>::value) {
+                                if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+                                    agg_method.data.prefetch_by_hash(
+                                            hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                                }
+
+                                return state.emplace_key(agg_method.data, hash_values[i], i,
+                                                         _agg_arena_pool);
+                            } else {
+                                return state.emplace_key(agg_method.data, i, _agg_arena_pool);
+                            }
+                        }();
 
                         /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
                         if (emplace_result.is_inserted()) {
@@ -842,16 +866,38 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
     std::visit(
             [&](auto&& agg_method) -> void {
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using HashTableType = std::decay_t<decltype(agg_method.data)>;
                 using AggState = typename HashMethodType::State;
                 AggState state(key_columns, _probe_key_sz, nullptr);
 
                 _pre_serialize_key_if_need(state, agg_method, key_columns, rows);
 
+                std::vector<size_t> hash_values;
+
+                if constexpr (IsPhmapTraits<HashTableType>::value) {
+                    if (hash_values.size() < rows) hash_values.resize(rows);
+                    for (size_t i = 0; i < rows; ++i) {
+                        hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                    }
+                }
+
                 /// For all rows.
                 for (size_t i = 0; i < rows; ++i) {
                     AggregateDataPtr aggregate_data = nullptr;
 
-                    auto emplace_result = state.emplace_key(agg_method.data, i, _agg_arena_pool);
+                    auto emplace_result = [&]() {
+                        if constexpr (IsPhmapTraits<HashTableType>::value) {
+                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+                                agg_method.data.prefetch_by_hash(
+                                        hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                            }
+
+                            return state.emplace_key(agg_method.data, hash_values[i], i,
+                                                     _agg_arena_pool);
+                        } else {
+                            return state.emplace_key(agg_method.data, i, _agg_arena_pool);
+                        }
+                    }();
 
                     /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
                     if (emplace_result.is_inserted()) {
@@ -1064,16 +1110,38 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) {
     std::visit(
             [&](auto&& agg_method) -> void {
                 using HashMethodType = std::decay_t<decltype(agg_method)>;
+                using HashTableType = std::decay_t<decltype(agg_method.data)>;
                 using AggState = typename HashMethodType::State;
                 AggState state(key_columns, _probe_key_sz, nullptr);
 
                 _pre_serialize_key_if_need(state, agg_method, key_columns, rows);
 
+                std::vector<size_t> hash_values;
+
+                if constexpr (IsPhmapTraits<HashTableType>::value) {
+                    if (hash_values.size() < rows) hash_values.resize(rows);
+                    for (size_t i = 0; i < rows; ++i) {
+                        hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+                    }
+                }
+
                 /// For all rows.
                 for (size_t i = 0; i < rows; ++i) {
                     AggregateDataPtr aggregate_data = nullptr;
 
-                    auto emplace_result = state.emplace_key(agg_method.data, i, _agg_arena_pool);
+                    auto emplace_result = [&]() {
+                        if constexpr (IsPhmapTraits<HashTableType>::value) {
+                            if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+                                agg_method.data.prefetch_by_hash(
+                                        hash_values[i + HASH_MAP_PREFETCH_DIST]);
+                            }
+
+                            return state.emplace_key(agg_method.data, hash_values[i], i,
+                                                     _agg_arena_pool);
+                        } else {
+                            return state.emplace_key(agg_method.data, i, _agg_arena_pool);
+                        }
+                    }();
 
                     /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
                     if (emplace_result.is_inserted()) {
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 0da44ec5dd..c2111905dc 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -106,7 +106,7 @@ private:
 };
 
 using AggregatedDataWithoutKey = AggregateDataPtr;
-using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
+using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr, DefaultHash<StringRef>>;
 
 /// For the case where there is one numeric key.
 /// FieldType is UInt8/16/32/64 for any type with corresponding bit width.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org