You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/14 03:26:14 UTC
[doris] branch master updated: [improvement]Use phmap for aggregation with serialized key (#10821)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d1573e1a4a [improvement]Use phmap for aggregation with serialized key (#10821)
d1573e1a4a is described below
commit d1573e1a4a715e34e7886810e792fb47c7e0e15d
Author: Jerry Hu <mr...@gmail.com>
AuthorDate: Thu Jul 14 11:26:09 2022 +0800
[improvement]Use phmap for aggregation with serialized key (#10821)
---
be/src/vec/common/columns_hashing.h | 1 +
be/src/vec/common/columns_hashing_impl.h | 50 +++++++++
be/src/vec/common/hash_table/ph_hash_map.h | 170 +++++++++++++++++++++++++++++
be/src/vec/exec/vaggregation_node.cpp | 76 ++++++++++++-
be/src/vec/exec/vaggregation_node.h | 2 +-
5 files changed, 294 insertions(+), 5 deletions(-)
diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h
index 300f7d70e0..9e4ed78f4e 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -28,6 +28,7 @@
#include "vec/common/columns_hashing_impl.h"
#include "vec/common/hash_table/hash_table.h"
#include "vec/common/hash_table/hash_table_key_holder.h"
+#include "vec/common/hash_table/ph_hash_map.h"
#include "vec/common/unaligned.h"
namespace doris::vectorized {
diff --git a/be/src/vec/common/columns_hashing_impl.h b/be/src/vec/common/columns_hashing_impl.h
index 3ba2f630f2..8aa1b1e96d 100644
--- a/be/src/vec/common/columns_hashing_impl.h
+++ b/be/src/vec/common/columns_hashing_impl.h
@@ -132,6 +132,13 @@ public:
return emplaceImpl(key_holder, data);
}
+ template <typename Data>
+ ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row,
+ Arena& pool) {
+ auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool);
+ return emplaceImpl(key_holder, hash_value, data);
+ }
+
template <typename Data>
ALWAYS_INLINE FindResult find_key(Data& data, size_t row, Arena& pool) {
auto key_holder = static_cast<Derived&>(*this).get_key_holder(row, pool);
@@ -207,6 +214,49 @@ protected:
return EmplaceResult(inserted);
}
+ template <typename Data, typename KeyHolder>
+ ALWAYS_INLINE EmplaceResult emplaceImpl(KeyHolder& key_holder, size_t hash_value, Data& data) {
+ if constexpr (Cache::consecutive_keys_optimization) {
+ if (cache.found && cache.check(key_holder_get_key(key_holder))) {
+ if constexpr (has_mapped)
+ return EmplaceResult(cache.value.second, cache.value.second, false);
+ else
+ return EmplaceResult(false);
+ }
+ }
+
+ typename Data::LookupResult it;
+ bool inserted = false;
+ data.emplace(key_holder, it, hash_value, inserted);
+
+ [[maybe_unused]] Mapped* cached = nullptr;
+ if constexpr (has_mapped) cached = lookup_result_get_mapped(it);
+
+ if (inserted) {
+ if constexpr (has_mapped) {
+ new (lookup_result_get_mapped(it)) Mapped();
+ }
+ }
+
+ if constexpr (consecutive_keys_optimization) {
+ cache.found = true;
+ cache.empty = false;
+
+ if constexpr (has_mapped) {
+ cache.value.first = *lookup_result_get_key(it);
+ cache.value.second = *lookup_result_get_mapped(it);
+ cached = &cache.value.second;
+ } else {
+ cache.value = *lookup_result_get_key(it);
+ }
+ }
+
+ if constexpr (has_mapped)
+ return EmplaceResult(*lookup_result_get_mapped(it), *cached, inserted);
+ else
+ return EmplaceResult(inserted);
+ }
+
template <typename Data, typename Key>
ALWAYS_INLINE FindResult find_key_impl(Key key, Data& data) {
if constexpr (Cache::consecutive_keys_optimization) {
diff --git a/be/src/vec/common/hash_table/ph_hash_map.h b/be/src/vec/common/hash_table/ph_hash_map.h
new file mode 100644
index 0000000000..2883f58f17
--- /dev/null
+++ b/be/src/vec/common/hash_table/ph_hash_map.h
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <parallel_hashmap/phmap.h>
+
+ALWAYS_INLINE inline char** lookup_result_get_mapped(std::pair<const StringRef, char*>* it) {
+ return &(it->second);
+}
+
+template <typename Map>
+struct IsPhmapTraits {
+ constexpr static bool value = false;
+};
+
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+class PHHashMap : private boost::noncopyable {
+public:
+ using Self = PHHashMap;
+ using HashMapImpl = phmap::flat_hash_map<Key, Mapped, Hash>;
+
+ using key_type = Key;
+ using mapped_type = Mapped;
+ using value_type = Key;
+
+ using LookupResult = typename HashMapImpl::value_type*;
+
+ using const_iterator_impl = typename HashMapImpl::const_iterator;
+ using iterator_impl = typename HashMapImpl::iterator;
+
+ template <typename Derived, bool is_const>
+ class iterator_base {
+ using BaseIterator = std::conditional_t<is_const, const_iterator_impl, iterator_impl>;
+
+ BaseIterator base_iterator;
+ friend class PHHashMap;
+
+ public:
+ iterator_base() {}
+ iterator_base(BaseIterator it) : base_iterator(it) {}
+
+ bool operator==(const iterator_base& rhs) const {
+ return base_iterator == rhs.base_iterator;
+ }
+ bool operator!=(const iterator_base& rhs) const {
+ return base_iterator != rhs.base_iterator;
+ }
+
+ Derived& operator++() {
+ base_iterator++;
+ return static_cast<Derived&>(*this);
+ }
+
+ auto& operator*() const { return *this; }
+ auto* operator->() const { return this; }
+
+ auto& operator*() { return *this; }
+ auto* operator->() { return this; }
+
+ const auto& get_first() const { return base_iterator->first; }
+
+ const auto& get_second() const { return base_iterator->second; }
+
+ auto& get_second() { return base_iterator->second; }
+
+ auto get_ptr() const { return *base_iterator; }
+ size_t get_hash() const { return base_iterator->get_hash(); }
+
+ size_t get_collision_chain_length() const { return 0; }
+ };
+
+ class iterator : public iterator_base<iterator, false> {
+ public:
+ using iterator_base<iterator, false>::iterator_base;
+ };
+
+ class const_iterator : public iterator_base<const_iterator, true> {
+ public:
+ using iterator_base<const_iterator, true>::iterator_base;
+ };
+
+ const_iterator begin() const { return const_iterator(_hash_map.cbegin()); }
+
+ const_iterator cbegin() const { return const_iterator(_hash_map.cbegin()); }
+
+ iterator begin() { return iterator(_hash_map.begin()); }
+
+ const_iterator end() const { return const_iterator(_hash_map.cend()); }
+ const_iterator cend() const { return const_iterator(_hash_map.cend()); }
+ iterator end() { return iterator(_hash_map.end()); }
+
+ template <typename KeyHolder>
+ void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& inserted) {
+ const auto& key = key_holder_get_key(key_holder);
+ inserted = false;
+ auto it_ = _hash_map.lazy_emplace(key, [&](const auto& ctor) {
+ inserted = true;
+ key_holder_persist_key(key_holder);
+ ctor(key_holder_get_key(key_holder), nullptr);
+ });
+ it = &*it_;
+ }
+
+ template <typename KeyHolder>
+ void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, size_t hash_value,
+ bool& inserted) {
+ const auto& key = key_holder_get_key(key_holder);
+ inserted = false;
+ auto it_ = _hash_map.lazy_emplace_with_hash(key, hash_value, [&](const auto& ctor) {
+ inserted = true;
+ key_holder_persist_key(key_holder);
+ ctor(key, nullptr);
+ });
+ it = &*it_;
+ }
+
+ size_t hash(const Key& x) const { return _hash_map.hash(x); }
+
+ void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) { _hash_map.prefetch_hash(hash_value); }
+
+ /// Call func(const Key &, Mapped &) for each hash map element.
+ template <typename Func>
+ void for_each_value(Func&& func) {
+ for (auto& v : *this) func(v.get_first(), v.get_second());
+ }
+
+ /// Call func(Mapped &) for each hash map element.
+ template <typename Func>
+ void for_each_mapped(Func&& func) {
+ for (auto& v : *this) func(v.get_second());
+ }
+
+ size_t get_buffer_size_in_bytes() const {
+ const auto capacity = _hash_map.capacity();
+ return capacity * sizeof(typename HashMapImpl::slot_type);
+ }
+
+ bool add_elem_size_overflow(size_t row) const {
+ const auto capacity = _hash_map.capacity();
+ // phmap use 7/8th as maximum load factor.
+ return (_hash_map.size() + row) > (capacity * 7 / 8);
+ }
+
+ size_t size() const { return _hash_map.size(); }
+
+ char* get_null_key_data() { return nullptr; }
+ bool has_null_key_data() const { return false; }
+
+ HashMapImpl _hash_map;
+};
+
+template <typename Key, typename Mapped, typename Hash>
+struct IsPhmapTraits<PHHashMap<Key, Mapped, Hash>> {
+ constexpr static bool value = true;
+};
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index cbecc624ad..e152e7d477 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -32,6 +32,9 @@
namespace doris::vectorized {
+// Here is an empirical value.
+static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;
+
/// The minimum reduction factor (input rows divided by output rows) to grow hash tables
/// in a streaming preaggregation, given that the hash tables are currently the given
/// size or above. The sizes roughly correspond to hash table sizes where the bucket
@@ -778,17 +781,38 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
std::visit(
[&](auto&& agg_method) -> void {
using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _probe_key_sz, nullptr);
_pre_serialize_key_if_need(state, agg_method, key_columns, rows);
+ std::vector<size_t> hash_values;
+
+ if constexpr (IsPhmapTraits<HashTableType>::value) {
+ if (hash_values.size() < rows) hash_values.resize(rows);
+ for (size_t i = 0; i < rows; ++i) {
+ hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+ }
+ }
+
/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;
- auto emplace_result =
- state.emplace_key(agg_method.data, i, _agg_arena_pool);
+ auto emplace_result = [&]() {
+ if constexpr (IsPhmapTraits<HashTableType>::value) {
+ if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+ agg_method.data.prefetch_by_hash(
+ hash_values[i + HASH_MAP_PREFETCH_DIST]);
+ }
+
+ return state.emplace_key(agg_method.data, hash_values[i], i,
+ _agg_arena_pool);
+ } else {
+ return state.emplace_key(agg_method.data, i, _agg_arena_pool);
+ }
+ }();
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.is_inserted()) {
@@ -842,16 +866,38 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) {
std::visit(
[&](auto&& agg_method) -> void {
using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _probe_key_sz, nullptr);
_pre_serialize_key_if_need(state, agg_method, key_columns, rows);
+ std::vector<size_t> hash_values;
+
+ if constexpr (IsPhmapTraits<HashTableType>::value) {
+ if (hash_values.size() < rows) hash_values.resize(rows);
+ for (size_t i = 0; i < rows; ++i) {
+ hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+ }
+ }
+
/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;
- auto emplace_result = state.emplace_key(agg_method.data, i, _agg_arena_pool);
+ auto emplace_result = [&]() {
+ if constexpr (IsPhmapTraits<HashTableType>::value) {
+ if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+ agg_method.data.prefetch_by_hash(
+ hash_values[i + HASH_MAP_PREFETCH_DIST]);
+ }
+
+ return state.emplace_key(agg_method.data, hash_values[i], i,
+ _agg_arena_pool);
+ } else {
+ return state.emplace_key(agg_method.data, i, _agg_arena_pool);
+ }
+ }();
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.is_inserted()) {
@@ -1064,16 +1110,38 @@ Status AggregationNode::_merge_with_serialized_key(Block* block) {
std::visit(
[&](auto&& agg_method) -> void {
using HashMethodType = std::decay_t<decltype(agg_method)>;
+ using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _probe_key_sz, nullptr);
_pre_serialize_key_if_need(state, agg_method, key_columns, rows);
+ std::vector<size_t> hash_values;
+
+ if constexpr (IsPhmapTraits<HashTableType>::value) {
+ if (hash_values.size() < rows) hash_values.resize(rows);
+ for (size_t i = 0; i < rows; ++i) {
+ hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
+ }
+ }
+
/// For all rows.
for (size_t i = 0; i < rows; ++i) {
AggregateDataPtr aggregate_data = nullptr;
- auto emplace_result = state.emplace_key(agg_method.data, i, _agg_arena_pool);
+ auto emplace_result = [&]() {
+ if constexpr (IsPhmapTraits<HashTableType>::value) {
+ if (LIKELY(i + HASH_MAP_PREFETCH_DIST < rows)) {
+ agg_method.data.prefetch_by_hash(
+ hash_values[i + HASH_MAP_PREFETCH_DIST]);
+ }
+
+ return state.emplace_key(agg_method.data, hash_values[i], i,
+ _agg_arena_pool);
+ } else {
+ return state.emplace_key(agg_method.data, i, _agg_arena_pool);
+ }
+ }();
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.is_inserted()) {
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 0da44ec5dd..c2111905dc 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -106,7 +106,7 @@ private:
};
using AggregatedDataWithoutKey = AggregateDataPtr;
-using AggregatedDataWithStringKey = HashMapWithSavedHash<StringRef, AggregateDataPtr>;
+using AggregatedDataWithStringKey = PHHashMap<StringRef, AggregateDataPtr, DefaultHash<StringRef>>;
/// For the case where there is one numeric key.
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org