You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/08 02:31:17 UTC

[incubator-doris] 03/14: [improvement](hll) Optimize Hyperloglog (#8829)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 7aace71325f1f0cbea446f14f7acc176741501b8
Author: zbtzbtzbt <35...@users.noreply.github.com>
AuthorDate: Fri Apr 8 09:06:08 2022 +0800

    [improvement](hll) Optimize Hyperloglog (#8829)
    
    In meituan, pr #6625 was revert due to the oom probleam.
    currently, we are trying to modify the old hyperloglog, based on pr #8555, we did some works.
    via some test, we find it better than old hll, and better than apache:master hll.
    
    Changes summary:
    
    - use SIMD max tp speed up heavy function _merge_registers
    - use phmap::flat_hash_set rather than std::set
    - replace std::max
    - other small changes
---
 be/src/exprs/aggregate_functions.cpp |   6 +-
 be/src/olap/hll.cpp                  | 143 ++++++++---------------------------
 be/src/olap/hll.h                    | 118 ++++++++++-------------------
 3 files changed, 75 insertions(+), 192 deletions(-)

diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
index 09d9b1b68f..9e442b3081 100644
--- a/be/src/exprs/aggregate_functions.cpp
+++ b/be/src/exprs/aggregate_functions.cpp
@@ -1108,11 +1108,9 @@ void AggregateFunctions::hll_merge(FunctionContext* ctx, const StringVal& src, S
     DCHECK(!src.is_null);
     DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION));
     DCHECK_EQ(src.len, std::pow(2, HLL_COLUMN_PRECISION));
-
-    auto dp = dst->ptr;
-    auto sp = src.ptr;
+    
     for (int i = 0; i < src.len; ++i) {
-        dp[i] = (dp[i] < sp[i] ? sp[i] : dp[i]);
+        dst->ptr[i] = (dst->ptr[i] < src.ptr[i] ? src.ptr[i] : dst->ptr[i]);
     }
 }
 
diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp
index 12fb912b3a..a2ec6e39a0 100644
--- a/be/src/olap/hll.cpp
+++ b/be/src/olap/hll.cpp
@@ -43,15 +43,13 @@ HyperLogLog::HyperLogLog(const Slice& src) {
 void HyperLogLog::_convert_explicit_to_register() {
     DCHECK(_type == HLL_DATA_EXPLICIT)
             << "_type(" << _type << ") should be explicit(" << HLL_DATA_EXPLICIT << ")";
-    _registers = new uint8_t[HLL_REGISTERS_COUNT]();
-
-    for (uint32_t i = 0; i < _explicit_data_num; ++i) {
-        _update_registers(_explicit_data[i]);
+    _registers = new uint8_t[HLL_REGISTERS_COUNT];
+    memset(_registers, 0, HLL_REGISTERS_COUNT);
+    for (auto value : _hash_set) {
+        _update_registers(value);
     }
-
-    delete [] _explicit_data;
-    _explicit_data = nullptr;
-    _explicit_data_num = 0;
+    // clear _hash_set
+    phmap::flat_hash_set<uint64_t>().swap(_hash_set);
 }
 
 // Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPARSE
@@ -59,14 +57,12 @@ void HyperLogLog::_convert_explicit_to_register() {
 void HyperLogLog::update(uint64_t hash_value) {
     switch (_type) {
     case HLL_DATA_EMPTY:
-        _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
-        _explicit_data[0] = hash_value;
-        _explicit_data_num = 1;
+        _hash_set.insert(hash_value);
         _type = HLL_DATA_EXPLICIT;
         break;
     case HLL_DATA_EXPLICIT:
-        if (_explicit_data_num < HLL_EXPLICIT_INT64_NUM) {
-            _explicit_data_insert(hash_value);
+        if (_hash_set.size() < HLL_EXPLICIT_INT64_NUM) {
+            _hash_set.insert(hash_value);
             break;
         }
         _convert_explicit_to_register();
@@ -90,10 +86,7 @@ void HyperLogLog::merge(const HyperLogLog& other) {
         _type = other._type;
         switch (other._type) {
         case HLL_DATA_EXPLICIT:
-            _explicit_data_num = other._explicit_data_num;
-            _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
-            memcpy(_explicit_data, other._explicit_data,
-                   sizeof(*_explicit_data) * _explicit_data_num);
+            _hash_set = other._hash_set;
             break;
         case HLL_DATA_SPARSE:
         case HLL_DATA_FULL:
@@ -110,47 +103,8 @@ void HyperLogLog::merge(const HyperLogLog& other) {
         case HLL_DATA_EXPLICIT: {
             // Merge other's explicit values first, then check if the number is exceed
             // HLL_EXPLICIT_INT64_NUM. This is OK because the max value is 2 * 160.
-            if (other._explicit_data_num > HLL_EXPLICIT_INT64_NUM / 2) { //merge
-                uint64_t explicit_data[HLL_EXPLICIT_INT64_NUM * 2];
-                memcpy(explicit_data, _explicit_data, sizeof(*_explicit_data) * _explicit_data_num);
-                uint32_t explicit_data_num = _explicit_data_num;
-                _explicit_data_num = 0;
-
-                // merge _explicit_data and other's _explicit_data to _explicit_data
-                uint32_t i = 0, j = 0, k = 0;
-                while (i < explicit_data_num || j < other._explicit_data_num) {
-                    if (i == explicit_data_num) {
-                        uint32_t n = other._explicit_data_num - j;
-                        memcpy(_explicit_data + k, other._explicit_data + j,
-                               n * sizeof(*_explicit_data));
-                        k += n;
-                        break;
-                    } else if (j == other._explicit_data_num) {
-                        uint32_t n = explicit_data_num - i;
-                        memcpy(_explicit_data + k, explicit_data + i, n * sizeof(*_explicit_data));
-                        k += n;
-                        break;
-                    } else {
-                        if (explicit_data[i] < other._explicit_data[j]) {
-                            _explicit_data[k++] = explicit_data[i++];
-                        } else if (explicit_data[i] > other._explicit_data[j]) {
-                            _explicit_data[k++] = other._explicit_data[j++];
-                        } else {
-                            _explicit_data[k++] = explicit_data[i++];
-                            j++;
-                        }
-                    }
-                }
-                _explicit_data_num = k;
-            } else { //insert one by one
-                int32_t n = other._explicit_data_num;
-                const uint64_t* data = other._explicit_data;
-                for (int32_t i = 0; i < n; ++i) {
-                    _explicit_data_insert(data[i]);
-                }
-            }
-
-            if (_explicit_data_num > HLL_EXPLICIT_INT64_NUM) {
+            _hash_set.insert(other._hash_set.begin(), other._hash_set.end());
+            if (_hash_set.size() > HLL_EXPLICIT_INT64_NUM) {
                 _convert_explicit_to_register();
                 _type = HLL_DATA_FULL;
             }
@@ -170,8 +124,8 @@ void HyperLogLog::merge(const HyperLogLog& other) {
     case HLL_DATA_FULL: {
         switch (other._type) {
         case HLL_DATA_EXPLICIT:
-            for (int32_t i = 0; i < other._explicit_data_num; ++i) {
-                _update_registers(other._explicit_data[i]);
+            for (auto hash_value : other._hash_set) {
+                _update_registers(hash_value);
             }
             break;
         case HLL_DATA_SPARSE:
@@ -192,7 +146,7 @@ size_t HyperLogLog::max_serialized_size() const {
     default:
         return 1;
     case HLL_DATA_EXPLICIT:
-        return 2 + _explicit_data_num * 8;
+        return 2 + _hash_set.size() * 8;
     case HLL_DATA_SPARSE:
     case HLL_DATA_FULL:
         return 1 + HLL_REGISTERS_COUNT;
@@ -201,32 +155,24 @@ size_t HyperLogLog::max_serialized_size() const {
 
 size_t HyperLogLog::serialize(uint8_t* dst) const {
     uint8_t* ptr = dst;
-
     switch (_type) {
     case HLL_DATA_EMPTY:
     default: {
         // When the _type is unknown, which may not happen, we encode it as
         // Empty HyperLogLog object.
         *ptr++ = HLL_DATA_EMPTY;
-
         break;
     }
     case HLL_DATA_EXPLICIT: {
-        DCHECK(_explicit_data_num < HLL_EXPLICIT_INT64_NUM)
-                << "Number of explicit elements(" << _explicit_data_num
+        DCHECK(_hash_set.size() <= HLL_EXPLICIT_INT64_NUM)
+                << "Number of explicit elements(" << _hash_set.size()
                 << ") should be less or equal than " << HLL_EXPLICIT_INT64_NUM;
         *ptr++ = _type;
-        *ptr++ = (uint8_t)_explicit_data_num;
-
-#if __BYTE_ORDER == __LITTLE_ENDIAN
-        memcpy(ptr, _explicit_data, _explicit_data_num * sizeof(*_explicit_data));
-        ptr += _explicit_data_num * sizeof(*_explicit_data);
-#else
-        for (int32_t i = 0; i < _explicit_data_num; ++i) {
-            *(uint64_t*)ptr = (uint64_t)gbswap_64(_explicit_data[i]);
+        *ptr++ = (uint8_t)_hash_set.size();
+        for (auto hash_value : _hash_set) {
+            encode_fixed64_le(ptr, hash_value);
             ptr += 8;
         }
-#endif
         break;
     }
     case HLL_DATA_SPARSE:
@@ -249,39 +195,15 @@ size_t HyperLogLog::serialize(uint8_t* dst) const {
             encode_fixed32_le(ptr, num_non_zero_registers);
             ptr += 4;
 
-            for (uint32_t i = 0; i < HLL_REGISTERS_COUNT;) {
-                if (*(uint32_t*)(&_registers[i]) == 0) {
-                    i += 4;
+            for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) {
+                if (_registers[i] == 0) {
                     continue;
                 }
-
-                if (UNLIKELY(_registers[i])) {
-                    encode_fixed16_le(ptr, i);
-                    ptr += 2;               // 2 bytes: register index
-                    *ptr++ = _registers[i]; // 1 byte: register value
-                }
-                ++i;
-
-                if (UNLIKELY(_registers[i])) {
-                    encode_fixed16_le(ptr, i);
-                    ptr += 2;               // 2 bytes: register index
-                    *ptr++ = _registers[i]; // 1 byte: register value
-                }
-                ++i;
-
-                if (UNLIKELY(_registers[i])) {
-                    encode_fixed16_le(ptr, i);
-                    ptr += 2;               // 2 bytes: register index
-                    *ptr++ = _registers[i]; // 1 byte: register value
-                }
-                ++i;
-
-                if (UNLIKELY(_registers[i])) {
-                    encode_fixed16_le(ptr, i);
-                    ptr += 2;               // 2 bytes: register index
-                    *ptr++ = _registers[i]; // 1 byte: register value
-                }
-                ++i;
+                // 2 bytes: register index
+                // 1 byte: register value
+                encode_fixed16_le(ptr, i);
+                ptr += 2;
+                *ptr++ = _registers[i];
             }
         }
         break;
@@ -355,24 +277,23 @@ bool HyperLogLog::deserialize(const Slice& slice) {
         // 2: number of explicit values
         // make sure that num_explicit is positive
         uint8_t num_explicits = *ptr++;
-        _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
         // 3+: 8 bytes hash value
         for (int i = 0; i < num_explicits; ++i) {
-            _explicit_data_insert(decode_fixed64_le(ptr));
+            _hash_set.insert(decode_fixed64_le(ptr));
             ptr += 8;
         }
         break;
     }
     case HLL_DATA_SPARSE: {
-        _registers = new uint8_t[HLL_REGISTERS_COUNT]();
+        _registers = new uint8_t[HLL_REGISTERS_COUNT];
+        memset(_registers, 0, HLL_REGISTERS_COUNT);
         // 2-5(4 byte): number of registers
         uint32_t num_registers = decode_fixed32_le(ptr);
-        uint16_t register_idx = 0;
         ptr += 4;
         for (uint32_t i = 0; i < num_registers; ++i) {
             // 2 bytes: register index
             // 1 byte: register value
-            register_idx = decode_fixed16_le(ptr);
+            uint16_t register_idx = decode_fixed16_le(ptr);
             ptr += 2;
             _registers[register_idx] = *ptr++;
         }
@@ -397,7 +318,7 @@ int64_t HyperLogLog::estimate_cardinality() const {
         return 0;
     }
     if (_type == HLL_DATA_EXPLICIT) {
-        return _explicit_data_num;
+        return _hash_set.size();
     }
 
     const int num_streams = HLL_REGISTERS_COUNT;
diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h
index 1ae56e2254..dc157f886e 100644
--- a/be/src/olap/hll.h
+++ b/be/src/olap/hll.h
@@ -24,6 +24,11 @@
 #include <map>
 #include <set>
 #include <string>
+#include <parallel_hashmap/phmap.h>
+
+#ifdef __x86_64__
+#include <immintrin.h>
+#endif
 
 #include "gutil/macros.h"
 
@@ -34,7 +39,6 @@ struct Slice;
 const static int HLL_COLUMN_PRECISION = 14;
 const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION);
 const static int HLL_EXPLICIT_INT64_NUM = 160;
-const static int HLL_EXPLICIT_INT64_NUM_DOUBLE = HLL_EXPLICIT_INT64_NUM * 2;
 const static int HLL_SPARSE_THRESHOLD = 4096;
 const static int HLL_REGISTERS_COUNT = 16 * 1024;
 // maximum size in byte of serialized HLL: type(1) + registers (2^14)
@@ -83,10 +87,9 @@ class HyperLogLog {
 public:
     HyperLogLog() = default;
     explicit HyperLogLog(uint64_t hash_value) : _type(HLL_DATA_EXPLICIT) {
-        _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
-        _explicit_data[0] = hash_value;
-        _explicit_data_num = 1;
+        _hash_set.emplace(hash_value);
     }
+    explicit HyperLogLog(const Slice& src);
 
     HyperLogLog(const HyperLogLog& other) {
         this->_type = other._type;
@@ -94,10 +97,7 @@ public:
         case HLL_DATA_EMPTY:
             break;
         case HLL_DATA_EXPLICIT: {
-            this->_explicit_data_num = other._explicit_data_num;
-            _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
-            memcpy(_explicit_data, other._explicit_data,
-                   sizeof(*_explicit_data) * _explicit_data_num);
+            this->_hash_set = other._hash_set;
             break;
         }
         case HLL_DATA_SPARSE:
@@ -105,10 +105,10 @@ public:
             _registers = new uint8_t[HLL_REGISTERS_COUNT];
             memcpy(_registers, other._registers, HLL_REGISTERS_COUNT);
             break;
+        }
         default:
             break;
         }
-        }
     }
 
     HyperLogLog(HyperLogLog&& other) {
@@ -117,10 +117,7 @@ public:
         case HLL_DATA_EMPTY:
             break;
         case HLL_DATA_EXPLICIT: {
-            this->_explicit_data_num = other._explicit_data_num;
-            this->_explicit_data = other._explicit_data;
-            other._explicit_data_num = 0;
-            other._explicit_data = nullptr;
+            this->_hash_set = std::move(other._hash_set);
             other._type = HLL_DATA_EMPTY;
             break;
         }
@@ -130,33 +127,25 @@ public:
             other._registers = nullptr;
             other._type = HLL_DATA_EMPTY;
             break;
+        }
         default:
             break;
         }
-        }
     }
 
     HyperLogLog& operator=(HyperLogLog&& other) {
         if (this != &other) {
-            if (_registers) {
+            if (_registers != nullptr) {
                 delete[] _registers;
                 _registers = nullptr;
             }
-            if (_explicit_data) {
-                delete[] _explicit_data;
-                _explicit_data = nullptr;
-            }
 
-            _explicit_data_num = 0;
             this->_type = other._type;
             switch (other._type) {
             case HLL_DATA_EMPTY:
                 break;
             case HLL_DATA_EXPLICIT: {
-                this->_explicit_data_num = other._explicit_data_num;
-                this->_explicit_data = other._explicit_data;
-                other._explicit_data_num = 0;
-                other._explicit_data = nullptr;
+                this->_hash_set = std::move(other._hash_set);
                 other._type = HLL_DATA_EMPTY;
                 break;
             }
@@ -166,35 +155,27 @@ public:
                 other._registers = nullptr;
                 other._type = HLL_DATA_EMPTY;
                 break;
+            }
             default:
                 break;
             }
-            }
         }
         return *this;
     }
 
     HyperLogLog& operator=(const HyperLogLog& other) {
         if (this != &other) {
-            if (_registers) {
+            if (_registers != nullptr) {
                 delete[] _registers;
                 _registers = nullptr;
             }
-            if (_explicit_data) {
-                delete[] _explicit_data;
-                _explicit_data = nullptr;
-            }
 
-            _explicit_data_num = 0;
             this->_type = other._type;
             switch (other._type) {
             case HLL_DATA_EMPTY:
                 break;
             case HLL_DATA_EXPLICIT: {
-                this->_explicit_data_num = other._explicit_data_num;
-                _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
-                memcpy(_explicit_data, other._explicit_data,
-                       sizeof(*_explicit_data) * _explicit_data_num);
+                this->_hash_set = other._hash_set;
                 break;
             }
             case HLL_DATA_SPARSE:
@@ -202,25 +183,20 @@ public:
                 _registers = new uint8_t[HLL_REGISTERS_COUNT];
                 memcpy(_registers, other._registers, HLL_REGISTERS_COUNT);
                 break;
+            }
             default:
                 break;
             }
-            }
         }
         return *this;
     }
 
-    explicit HyperLogLog(const Slice& src);
-
     ~HyperLogLog() { clear(); }
-
     void clear() {
         _type = HLL_DATA_EMPTY;
+        _hash_set.clear();
         delete[] _registers;
         _registers = nullptr;
-        delete[] _explicit_data;
-        _explicit_data = nullptr;
-        _explicit_data_num = 0;
     }
 
     typedef uint8_t SetTypeValueType;
@@ -239,8 +215,10 @@ public:
 
     size_t memory_consumed() const {
         size_t size = sizeof(*this);
-        if (_explicit_data) size += HLL_EXPLICIT_INT64_NUM_DOUBLE;
-        if (_registers) size += HLL_REGISTERS_COUNT;
+        if (_type == HLL_DATA_EXPLICIT) 
+            size += _hash_set.size() * sizeof(uint64_t);
+        else if (_type == HLL_DATA_SPARSE || _type == HLL_DATA_FULL) 
+            size += HLL_REGISTERS_COUNT;
         return size;
     }
 
@@ -277,7 +255,7 @@ public:
         case HLL_DATA_SPARSE:
         case HLL_DATA_FULL: {
             std::string str {"hash set size: "};
-            str.append(std::to_string((size_t)_explicit_data_num));
+            str.append(std::to_string(_hash_set.size()));
             str.append("\ncardinality:\t");
             str.append(std::to_string(estimate_cardinality()));
             str.append("\ntype:\t");
@@ -291,9 +269,7 @@ public:
 
 private:
     HllDataType _type = HLL_DATA_EMPTY;
-
-    uint32_t _explicit_data_num = 0;
-    uint64_t* _explicit_data = nullptr;
+    phmap::flat_hash_set<uint64_t> _hash_set;
 
     // This field is much space consuming(HLL_REGISTERS_COUNT), we create
     // it only when it is really needed.
@@ -312,40 +288,28 @@ private:
         // make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1
         hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS);
         uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1;
-        _registers[idx] = _registers[idx] > first_one_bit ? _registers[idx] : first_one_bit;
+        _registers[idx] = (_registers[idx] < first_one_bit ? first_one_bit : _registers[idx]);
     }
 
     // absorb other registers into this registers
-    void _merge_registers(const uint8_t* other) {
-        for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) {
-            _registers[i] = _registers[i] < other[i] ? other[i] : _registers[i];
+    void _merge_registers(const uint8_t* other_registers) {
+#ifdef __AVX2__
+        int loop = HLL_REGISTERS_COUNT / 32; // 32 = 256/8
+        uint8_t* dst = _registers;
+        const uint8_t* src = other_registers;
+        for (int i = 0; i < loop; i++) {
+            __m256i xa = _mm256_loadu_si256((const __m256i*)dst);
+            __m256i xb = _mm256_loadu_si256((const __m256i*)src);
+            _mm256_storeu_si256((__m256i*)dst, _mm256_max_epu8(xa, xb));
+            src += 32;
+            dst += 32;
         }
-    }
-
-    bool _explicit_data_insert(uint64_t data) {
-        //find insert pos
-        int32_t i = (int32_t)_explicit_data_num - 1;
-        while (i >= 0) {
-            if (_explicit_data[i] == data) {
-                return false;
-            } else if (_explicit_data[i] < data) {
-                break;
-            } else {
-                --i;
-            }
-        }
-
-        ++i; //now, i is the insert position
-
-        size_t n = (_explicit_data_num - i) * sizeof(*_explicit_data);
-        if (n) {
-            memmove(_explicit_data + i + 1, _explicit_data + i, n);
+#else
+        for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) {
+            _registers[i] =
+                    (_registers[i] < other_registers[i] ? other_registers[i] : _registers[i]);
         }
-
-        //insert data
-        _explicit_data[i] = data;
-        _explicit_data_num++;
-        return true;
+#endif
     }
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org