You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/30 13:50:00 UTC

[doris] branch revert-11257-distinct_phmap created (now a355c02fb7)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch revert-11257-distinct_phmap
in repository https://gitbox.apache.org/repos/asf/doris.git


      at a355c02fb7 Revert "[improvement]Use phmap::flat_hash_set in AggregateFunctionUniq (#11257)"

This branch includes the following new commits:

     new a355c02fb7 Revert "[improvement]Use phmap::flat_hash_set in AggregateFunctionUniq (#11257)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/01: Revert "[improvement]Use phmap::flat_hash_set in AggregateFunctionUniq (#11257)"

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch revert-11257-distinct_phmap
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a355c02fb7f43bbaa7b3b90afbaa7f3942f519e3
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sat Jul 30 21:49:56 2022 +0800

    Revert "[improvement]Use phmap::flat_hash_set in AggregateFunctionUniq (#11257)"
    
    This reverts commit a7199fb98e18b925664b38460b667d04cbee8e01.
---
 .../vec/aggregate_functions/aggregate_function.h   |  15 ---
 .../aggregate_function_nothing.h                   |   3 -
 .../aggregate_functions/aggregate_function_null.h  |  12 --
 .../aggregate_functions/aggregate_function_uniq.h  | 137 +++++----------------
 be/src/vec/exec/vaggregation_node.cpp              |  12 +-
 5 files changed, 43 insertions(+), 136 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h
index c7c7fc38ca..677c189002 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -107,10 +107,6 @@ public:
     virtual void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena* arena,
                                  size_t num_rows) const = 0;
 
-    /// Deserializes state and merge it with current aggregation function.
-    virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
-                                       Arena* arena) const = 0;
-
     /// Returns true if a function requires Arena to handle own states (see add(), merge(), deserialize()).
     virtual bool allocates_memory_in_arena() const { return false; }
 
@@ -257,17 +253,6 @@ public:
     size_t align_of_data() const override { return alignof(Data); }
 
     void reset(AggregateDataPtr place) const override {}
-
-    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
-                               Arena* arena) const override {
-        Data deserialized_data;
-        AggregateDataPtr deserialized_place = (AggregateDataPtr)&deserialized_data;
-
-        auto derived = static_cast<const Derived*>(this);
-        derived->create(deserialized_place);
-        derived->deserialize(deserialized_place, buf, arena);
-        derived->merge(place, deserialized_place, arena);
-    }
 };
 
 using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h b/be/src/vec/aggregate_functions/aggregate_function_nothing.h
index 64af14a6cf..c0ae740be4 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h
@@ -64,9 +64,6 @@ public:
     void insert_result_into(ConstAggregateDataPtr, IColumn& to) const override {
         to.insert_default();
     }
-
-    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
-                               Arena* arena) const override {}
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h b/be/src/vec/aggregate_functions/aggregate_function_null.h
index 89960bc9f0..5b804b82a7 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -151,18 +151,6 @@ public:
         }
     }
 
-    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
-                               Arena* arena) const override {
-        bool flag = true;
-        if (result_is_nullable) {
-            read_binary(flag, buf);
-        }
-        if (flag) {
-            set_flag(place);
-            nested_function->deserialize_and_merge(nested_place(place), buf, arena);
-        }
-    }
-
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
         if constexpr (result_is_nullable) {
             ColumnNullable& to_concrete = assert_cast<ColumnNullable&>(to);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_uniq.h b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
index 988e9bdb01..c717307c72 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_uniq.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_uniq.h
@@ -20,8 +20,6 @@
 
 #pragma once
 
-#include <parallel_hashmap/phmap.h>
-
 #include <type_traits>
 
 #include "gutil/hash/city.h"
@@ -36,26 +34,29 @@
 
 namespace doris::vectorized {
 
-// Here is an empirical value.
-static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;
-
 /// uniqExact
 
 template <typename T>
 struct AggregateFunctionUniqExactData {
-    static constexpr bool is_string_key = std::is_same_v<T, String>;
-    using Key = std::conditional_t<is_string_key, UInt128, T>;
-    using Hash = std::conditional_t<is_string_key, UInt128TrivialHash, HashCRC32<Key>>;
-
-    using Set = phmap::flat_hash_set<Key, Hash>;
-
-    static UInt128 ALWAYS_INLINE get_key(const StringRef& value) {
-        UInt128 key;
-        SipHash hash;
-        hash.update(value.data, value.size);
-        hash.get128(key.low, key.high);
-        return key;
-    }
+    using Key = T;
+
+    /// When creating, the hash table must be small.
+    using Set = HashSet<Key, HashCRC32<Key>, HashTableGrower<4>,
+                        HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 4)>>;
+
+    Set set;
+
+    static String get_name() { return "uniqExact"; }
+};
+
+/// For rows, we put the SipHash values (128 bits) into the hash table.
+template <>
+struct AggregateFunctionUniqExactData<String> {
+    using Key = UInt128;
+
+    /// When creating, the hash table must be small.
+    using Set = HashSet<Key, UInt128TrivialHash, HashTableGrower<3>,
+                        HashTableAllocatorWithStackMemory<sizeof(Key) * (1 << 3)>>;
 
     Set set;
 
@@ -72,9 +73,16 @@ struct OneAdder {
     static void ALWAYS_INLINE add(Data& data, const IColumn& column, size_t row_num) {
         if constexpr (std::is_same_v<T, String>) {
             StringRef value = column.get_data_at(row_num);
-            data.set.insert(Data::get_key(value));
-        } else if constexpr (IsDecimalNumber<T>) {
-            data.set.insert(assert_cast<const ColumnDecimal<T>&>(column).get_data()[row_num]);
+
+            UInt128 key;
+            SipHash hash;
+            hash.update(value.data, value.size);
+            hash.get128(key.low, key.high);
+
+            data.set.insert(key);
+        } else if constexpr (std::is_same_v<T, Decimal128>) {
+            data.set.insert(
+                    assert_cast<const ColumnDecimal<Decimal128>&>(column).get_data()[row_num]);
         } else {
             data.set.insert(assert_cast<const ColumnVector<T>&>(column).get_data()[row_num]);
         }
@@ -88,7 +96,6 @@ template <typename T, typename Data>
 class AggregateFunctionUniq final
         : public IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>> {
 public:
-    using KeyType = std::conditional_t<std::is_same_v<T, String>, UInt128, T>;
     AggregateFunctionUniq(const DataTypes& argument_types_)
             : IAggregateFunctionDataHelper<Data, AggregateFunctionUniq<T, Data>>(argument_types_,
                                                                                  {}) {}
@@ -102,96 +109,18 @@ public:
         detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
     }
 
-    static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& keys_container,
-                                                 const IColumn& column, size_t batch_size) {
-        if constexpr (std::is_same_v<T, String>) {
-            keys_container.resize(batch_size);
-            for (size_t i = 0; i != batch_size; ++i) {
-                StringRef value = column.get_data_at(i);
-                keys_container[i] = Data::get_key(value);
-            }
-            return keys_container.data();
-        } else {
-            using ColumnType =
-                    std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
-            return assert_cast<const ColumnType&>(column).get_data().data();
-        }
-    }
-
-    void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
-                   const IColumn** columns, Arena* arena) const override {
-        std::vector<KeyType> keys_container;
-        const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
-
-        std::vector<typename Data::Set*> array_of_data_set(batch_size);
-
-        for (size_t i = 0; i != batch_size; ++i) {
-            array_of_data_set[i] = &(this->data(places[i] + place_offset).set);
-        }
-
-        for (size_t i = 0; i != batch_size; ++i) {
-            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
-                array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch(
-                        keys[i + HASH_MAP_PREFETCH_DIST]);
-            }
-
-            array_of_data_set[i]->insert(keys[i]);
-        }
-    }
-
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        auto& rhs_set = this->data(rhs).set;
-        if (rhs_set.size() == 0) return;
-
-        auto& set = this->data(place).set;
-        set.rehash(set.size() + rhs_set.size());
-
-        for (auto elem : rhs_set) {
-            set.insert(elem);
-        }
-    }
-
-    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
-                                Arena* arena) const override {
-        std::vector<KeyType> keys_container;
-        const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
-        auto& set = this->data(place).set;
-
-        for (size_t i = 0; i != batch_size; ++i) {
-            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
-                set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]);
-            }
-            set.insert(keys[i]);
-        }
+        this->data(place).set.merge(this->data(rhs).set);
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
-        auto& set = this->data(place).set;
-        write_var_uint(set.size(), buf);
-        for (const auto& elem : set) {
-            write_pod_binary(elem, buf);
-        }
-    }
-
-    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
-                               Arena* arena) const override {
-        auto& set = this->data(place).set;
-        size_t size;
-        read_var_uint(size, buf);
-
-        set.rehash(size + set.size());
-
-        for (size_t i = 0; i < size; ++i) {
-            KeyType ref;
-            read_pod_binary(ref, buf);
-            set.insert(ref);
-        }
+        this->data(place).set.write(buf);
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
-                     Arena* arena) const override {
-        deserialize_and_merge(place, buf, arena);
+                     Arena*) const override {
+        this->data(place).set.read(buf);
     }
 
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 54f6b8d15a..d4325bc17e 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -618,10 +618,18 @@ Status AggregationNode::_merge_without_key(Block* block) {
 
             for (int j = 0; j < rows; ++j) {
                 VectorBufferReader buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
+                _create_agg_status(deserialize_buffer.get());
 
-                _aggregate_evaluators[i]->function()->deserialize_and_merge(
-                        _agg_data.without_key + _offsets_of_aggregate_states[i], buffer_reader,
+                _aggregate_evaluators[i]->function()->deserialize(
+                        deserialize_buffer.get() + _offsets_of_aggregate_states[i], buffer_reader,
                         &_agg_arena_pool);
+
+                _aggregate_evaluators[i]->function()->merge(
+                        _agg_data.without_key + _offsets_of_aggregate_states[i],
+                        deserialize_buffer.get() + _offsets_of_aggregate_states[i],
+                        &_agg_arena_pool);
+
+                _destroy_agg_status(deserialize_buffer.get());
             }
         } else {
             _aggregate_evaluators[i]->execute_single_add(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org