You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/27 09:08:37 UTC

[GitHub] [doris] mrhhsg opened a new pull request, #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

mrhhsg opened a new pull request, #11257:
URL: https://github.com/apache/doris/pull/11257

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   Test on clikcbench with SQL:
   ```sql
   SELECT COUNT(DISTINCT SearchPhrase), count(distinct userid) FROM hits;
   ```
   Executing time reduced to 7.60 sec from 9.73 sec
   
   ## Checklist(Required)
   
   1. Type of your changes:
       - [x] Improvement
       - [ ] Fix
       - [ ] Feature-WIP
       - [ ] Feature
       - [ ] Doc
       - [ ] Refator
       - [ ] Others: 
   2. Does it affect the original behavior: 
       - [ ] Yes
       - [ ] No
       - [ ] I don't know
   3. Has unit tests been added:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   4. Has document been added or modified:
       - [ ] Yes
       - [ ] No
       - [ ] No Need
   5. Does it need to update dependencies:
       - [ ] Yes
       - [ ] No
   6. Are there any changes that cannot be rolled back:
       - [ ] Yes
       - [ ] No
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] BiteTheDDDDt commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r930867953


##########
be/src/vec/aggregate_functions/aggregate_function.h:
##########
@@ -224,6 +228,9 @@ class IAggregateFunctionHelper : public IAggregateFunction {
                                                      arena);
         }
     }
+
+    virtual void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
+                                       Arena* arena) const override {}

Review Comment:
   Does we need add some check at here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] wangbo commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
wangbo commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r932931508


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -109,18 +102,96 @@ class AggregateFunctionUniq final
         detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
     }
 
+    static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& keys_container,
+                                                 const IColumn& column, size_t batch_size) {
+        if constexpr (std::is_same_v<T, String>) {
+            keys_container.resize(batch_size);
+            for (size_t i = 0; i != batch_size; ++i) {
+                StringRef value = column.get_data_at(i);
+                keys_container[i] = Data::get_key(value);
+            }
+            return keys_container.data();
+        } else {
+            using ColumnType =
+                    std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
+            return assert_cast<const ColumnType&>(column).get_data().data();
+        }
+    }
+
+    void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
+                   const IColumn** columns, Arena* arena) const override {
+        std::vector<KeyType> keys_container;
+        const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
+
+        std::vector<typename Data::Set*> array_of_data_set(batch_size);
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            array_of_data_set[i] = &(this->data(places[i] + place_offset).set);
+        }
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
+                array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch(
+                        keys[i + HASH_MAP_PREFETCH_DIST]);
+            }
+
+            array_of_data_set[i]->insert(keys[i]);
+        }
+    }
+
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);
+        }
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        std::vector<KeyType> keys_container;
+        const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
+        auto& set = this->data(place).set;
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
+                set.prefetch(keys[i + HASH_MAP_PREFETCH_DIST]);
+            }
+            set.insert(keys[i]);
+        }
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
-        this->data(place).set.write(buf);
+        auto& set = this->data(place).set;
+        write_var_uint(set.size(), buf);
+        for (const auto& elem : set) {
+            write_pod_binary(elem, buf);

Review Comment:
   How about add a Todo here; After phmap is included in BE's code. We can  serialize phmap in copy way



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] mrhhsg commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
mrhhsg commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r931021956


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -111,16 +110,82 @@ class AggregateFunctionUniq final
 
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);
+        }
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        auto& column = *columns[0];
+        std::vector<KeyType> keys(batch_size);
+        for (size_t i = 0; i != batch_size; ++i) {
+            if constexpr (std::is_same_v<T, String>) {
+                StringRef value = column.get_data_at(i);
+
+                UInt128 key;
+                SipHash hash;
+                hash.update(value.data, value.size);
+                hash.get128(key.low, key.high);
+
+                keys[i] = key;
+            } else if constexpr (std::is_same_v<T, Decimal128>) {
+                keys[i] = assert_cast<const ColumnDecimal<Decimal128>&>(column).get_data()[i];
+            } else {
+                keys[i] = assert_cast<const ColumnVector<T>&>(column).get_data()[i];
+            }
+        }
+
+        auto& set = this->data(place).set;
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + 16 < batch_size) {
+                set.prefetch(keys[i + 16]);
+            }
+            set.insert(keys[i]);
+        }
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
-        this->data(place).set.write(buf);
+        auto& set = this->data(place).set;
+        write_var_uint(set.size(), buf);
+        for (const auto& elem : set) {
+            write_pod_binary(elem, buf);
+        }
+    }
+
+    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
+                               Arena* arena) const override {
+        auto& set = this->data(place).set;
+        size_t size;
+        read_var_uint(size, buf);
+
+        set.rehash(size + set.size());
+
+        for (size_t i = 0; i < size; ++i) {
+            KeyType ref;
+            read_pod_binary(ref, buf);
+            set.insert(ref);
+        }
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,

Review Comment:
   Yes, `deserialize` just does the same thing as `deserialize_and_merge`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #11257:
URL: https://github.com/apache/doris/pull/11257#issuecomment-1199009969

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] github-actions[bot] commented on pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #11257:
URL: https://github.com/apache/doris/pull/11257#issuecomment-1196802320

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] mrhhsg commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
mrhhsg commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r932965068


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -109,18 +102,96 @@ class AggregateFunctionUniq final
         detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
     }
 
+    static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& keys_container,
+                                                 const IColumn& column, size_t batch_size) {
+        if constexpr (std::is_same_v<T, String>) {
+            keys_container.resize(batch_size);
+            for (size_t i = 0; i != batch_size; ++i) {
+                StringRef value = column.get_data_at(i);
+                keys_container[i] = Data::get_key(value);
+            }
+            return keys_container.data();
+        } else {
+            using ColumnType =
+                    std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
+            return assert_cast<const ColumnType&>(column).get_data().data();
+        }
+    }
+
+    void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
+                   const IColumn** columns, Arena* arena) const override {
+        std::vector<KeyType> keys_container;
+        const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
+
+        std::vector<typename Data::Set*> array_of_data_set(batch_size);
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            array_of_data_set[i] = &(this->data(places[i] + place_offset).set);
+        }
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
+                array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch(
+                        keys[i + HASH_MAP_PREFETCH_DIST]);
+            }
+
+            array_of_data_set[i]->insert(keys[i]);
+        }
+    }
+
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);

Review Comment:
   Yes, `phmap::merge` inserts elements one by one and it requires the src(rhs) is not constant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] BiteTheDDDDt commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r930881724


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -111,16 +110,82 @@ class AggregateFunctionUniq final
 
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);
+        }
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        auto& column = *columns[0];
+        std::vector<KeyType> keys(batch_size);
+        for (size_t i = 0; i != batch_size; ++i) {
+            if constexpr (std::is_same_v<T, String>) {
+                StringRef value = column.get_data_at(i);
+
+                UInt128 key;
+                SipHash hash;
+                hash.update(value.data, value.size);
+                hash.get128(key.low, key.high);
+
+                keys[i] = key;
+            } else if constexpr (std::is_same_v<T, Decimal128>) {
+                keys[i] = assert_cast<const ColumnDecimal<Decimal128>&>(column).get_data()[i];
+            } else {
+                keys[i] = assert_cast<const ColumnVector<T>&>(column).get_data()[i];
+            }
+        }
+
+        auto& set = this->data(place).set;
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + 16 < batch_size) {
+                set.prefetch(keys[i + 16]);
+            }
+            set.insert(keys[i]);
+        }
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
-        this->data(place).set.write(buf);
+        auto& set = this->data(place).set;
+        write_var_uint(set.size(), buf);
+        for (const auto& elem : set) {
+            write_pod_binary(elem, buf);
+        }
+    }
+
+    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
+                               Arena* arena) const override {
+        auto& set = this->data(place).set;
+        size_t size;
+        read_var_uint(size, buf);
+
+        set.rehash(size + set.size());
+
+        for (size_t i = 0; i < size; ++i) {
+            KeyType ref;
+            read_pod_binary(ref, buf);
+            set.insert(ref);
+        }
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,

Review Comment:
   Seems only diffrent with `deserialize_and_merge` is set.rehash, maybe we can do some abstract here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] yiguolei merged pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
yiguolei merged PR #11257:
URL: https://github.com/apache/doris/pull/11257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [doris] wangbo commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

Posted by GitBox <gi...@apache.org>.
wangbo commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r932930559


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -109,18 +102,96 @@ class AggregateFunctionUniq final
         detail::OneAdder<T, Data>::add(this->data(place), *columns[0], row_num);
     }
 
+    static ALWAYS_INLINE const KeyType* get_keys(std::vector<KeyType>& keys_container,
+                                                 const IColumn& column, size_t batch_size) {
+        if constexpr (std::is_same_v<T, String>) {
+            keys_container.resize(batch_size);
+            for (size_t i = 0; i != batch_size; ++i) {
+                StringRef value = column.get_data_at(i);
+                keys_container[i] = Data::get_key(value);
+            }
+            return keys_container.data();
+        } else {
+            using ColumnType =
+                    std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
+            return assert_cast<const ColumnType&>(column).get_data().data();
+        }
+    }
+
+    void add_batch(size_t batch_size, AggregateDataPtr* places, size_t place_offset,
+                   const IColumn** columns, Arena* arena) const override {
+        std::vector<KeyType> keys_container;
+        const KeyType* keys = get_keys(keys_container, *columns[0], batch_size);
+
+        std::vector<typename Data::Set*> array_of_data_set(batch_size);
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            array_of_data_set[i] = &(this->data(places[i] + place_offset).set);
+        }
+
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + HASH_MAP_PREFETCH_DIST < batch_size) {
+                array_of_data_set[i + HASH_MAP_PREFETCH_DIST]->prefetch(
+                        keys[i + HASH_MAP_PREFETCH_DIST]);
+            }
+
+            array_of_data_set[i]->insert(keys[i]);
+        }
+    }
+
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);

Review Comment:
   phmap has merge method, is that same with insert one by one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org