You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/27 12:50:39 UTC

[GitHub] [doris] mrhhsg commented on a diff in pull request #11257: [improvement]Use phmap::flat_hash_set in AggregateFunctionUniq

mrhhsg commented on code in PR #11257:
URL: https://github.com/apache/doris/pull/11257#discussion_r931021956


##########
be/src/vec/aggregate_functions/aggregate_function_uniq.h:
##########
@@ -111,16 +110,82 @@ class AggregateFunctionUniq final
 
     void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
                Arena*) const override {
-        this->data(place).set.merge(this->data(rhs).set);
+        auto& rhs_set = this->data(rhs).set;
+        if (rhs_set.size() == 0) return;
+
+        auto& set = this->data(place).set;
+        set.rehash(set.size() + rhs_set.size());
+
+        for (auto elem : rhs_set) {
+            set.insert(elem);
+        }
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        auto& column = *columns[0];
+        std::vector<KeyType> keys(batch_size);
+        for (size_t i = 0; i != batch_size; ++i) {
+            if constexpr (std::is_same_v<T, String>) {
+                StringRef value = column.get_data_at(i);
+
+                UInt128 key;
+                SipHash hash;
+                hash.update(value.data, value.size);
+                hash.get128(key.low, key.high);
+
+                keys[i] = key;
+            } else if constexpr (std::is_same_v<T, Decimal128>) {
+                keys[i] = assert_cast<const ColumnDecimal<Decimal128>&>(column).get_data()[i];
+            } else {
+                keys[i] = assert_cast<const ColumnVector<T>&>(column).get_data()[i];
+            }
+        }
+
+        auto& set = this->data(place).set;
+        for (size_t i = 0; i != batch_size; ++i) {
+            if (i + 16 < batch_size) {
+                set.prefetch(keys[i + 16]);
+            }
+            set.insert(keys[i]);
+        }
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
-        this->data(place).set.write(buf);
+        auto& set = this->data(place).set;
+        write_var_uint(set.size(), buf);
+        for (const auto& elem : set) {
+            write_pod_binary(elem, buf);
+        }
+    }
+
+    void deserialize_and_merge(AggregateDataPtr __restrict place, BufferReadable& buf,
+                               Arena* arena) const override {
+        auto& set = this->data(place).set;
+        size_t size;
+        read_var_uint(size, buf);
+
+        set.rehash(size + set.size());
+
+        for (size_t i = 0; i < size; ++i) {
+            KeyType ref;
+            read_pod_binary(ref, buf);
+            set.insert(ref);
+        }
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,

Review Comment:
   Yes, `deserialize` just does the same thing as `deserialize_and_merge`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org