You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/08 02:31:17 UTC
[incubator-doris] 03/14: [improvement](hll) Optimize Hyperloglog (#8829)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 7aace71325f1f0cbea446f14f7acc176741501b8
Author: zbtzbtzbt <35...@users.noreply.github.com>
AuthorDate: Fri Apr 8 09:06:08 2022 +0800
[improvement](hll) Optimize Hyperloglog (#8829)
In meituan, pr #6625 was revert due to the oom probleam.
currently, we are trying to modify the old hyperloglog, based on pr #8555, we did some works.
via some test, we find it better than old hll, and better than apache:master hll.
Changes summary:
- use SIMD max tp speed up heavy function _merge_registers
- use phmap::flat_hash_set rather than std::set
- replace std::max
- other small changes
---
be/src/exprs/aggregate_functions.cpp | 6 +-
be/src/olap/hll.cpp | 143 ++++++++---------------------------
be/src/olap/hll.h | 118 ++++++++++-------------------
3 files changed, 75 insertions(+), 192 deletions(-)
diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
index 09d9b1b68f..9e442b3081 100644
--- a/be/src/exprs/aggregate_functions.cpp
+++ b/be/src/exprs/aggregate_functions.cpp
@@ -1108,11 +1108,9 @@ void AggregateFunctions::hll_merge(FunctionContext* ctx, const StringVal& src, S
DCHECK(!src.is_null);
DCHECK_EQ(dst->len, std::pow(2, HLL_COLUMN_PRECISION));
DCHECK_EQ(src.len, std::pow(2, HLL_COLUMN_PRECISION));
-
- auto dp = dst->ptr;
- auto sp = src.ptr;
+
for (int i = 0; i < src.len; ++i) {
- dp[i] = (dp[i] < sp[i] ? sp[i] : dp[i]);
+ dst->ptr[i] = (dst->ptr[i] < src.ptr[i] ? src.ptr[i] : dst->ptr[i]);
}
}
diff --git a/be/src/olap/hll.cpp b/be/src/olap/hll.cpp
index 12fb912b3a..a2ec6e39a0 100644
--- a/be/src/olap/hll.cpp
+++ b/be/src/olap/hll.cpp
@@ -43,15 +43,13 @@ HyperLogLog::HyperLogLog(const Slice& src) {
void HyperLogLog::_convert_explicit_to_register() {
DCHECK(_type == HLL_DATA_EXPLICIT)
<< "_type(" << _type << ") should be explicit(" << HLL_DATA_EXPLICIT << ")";
- _registers = new uint8_t[HLL_REGISTERS_COUNT]();
-
- for (uint32_t i = 0; i < _explicit_data_num; ++i) {
- _update_registers(_explicit_data[i]);
+ _registers = new uint8_t[HLL_REGISTERS_COUNT];
+ memset(_registers, 0, HLL_REGISTERS_COUNT);
+ for (auto value : _hash_set) {
+ _update_registers(value);
}
-
- delete [] _explicit_data;
- _explicit_data = nullptr;
- _explicit_data_num = 0;
+ // clear _hash_set
+ phmap::flat_hash_set<uint64_t>().swap(_hash_set);
}
// Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPARSE
@@ -59,14 +57,12 @@ void HyperLogLog::_convert_explicit_to_register() {
void HyperLogLog::update(uint64_t hash_value) {
switch (_type) {
case HLL_DATA_EMPTY:
- _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
- _explicit_data[0] = hash_value;
- _explicit_data_num = 1;
+ _hash_set.insert(hash_value);
_type = HLL_DATA_EXPLICIT;
break;
case HLL_DATA_EXPLICIT:
- if (_explicit_data_num < HLL_EXPLICIT_INT64_NUM) {
- _explicit_data_insert(hash_value);
+ if (_hash_set.size() < HLL_EXPLICIT_INT64_NUM) {
+ _hash_set.insert(hash_value);
break;
}
_convert_explicit_to_register();
@@ -90,10 +86,7 @@ void HyperLogLog::merge(const HyperLogLog& other) {
_type = other._type;
switch (other._type) {
case HLL_DATA_EXPLICIT:
- _explicit_data_num = other._explicit_data_num;
- _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
- memcpy(_explicit_data, other._explicit_data,
- sizeof(*_explicit_data) * _explicit_data_num);
+ _hash_set = other._hash_set;
break;
case HLL_DATA_SPARSE:
case HLL_DATA_FULL:
@@ -110,47 +103,8 @@ void HyperLogLog::merge(const HyperLogLog& other) {
case HLL_DATA_EXPLICIT: {
// Merge other's explicit values first, then check if the number is exceed
// HLL_EXPLICIT_INT64_NUM. This is OK because the max value is 2 * 160.
- if (other._explicit_data_num > HLL_EXPLICIT_INT64_NUM / 2) { //merge
- uint64_t explicit_data[HLL_EXPLICIT_INT64_NUM * 2];
- memcpy(explicit_data, _explicit_data, sizeof(*_explicit_data) * _explicit_data_num);
- uint32_t explicit_data_num = _explicit_data_num;
- _explicit_data_num = 0;
-
- // merge _explicit_data and other's _explicit_data to _explicit_data
- uint32_t i = 0, j = 0, k = 0;
- while (i < explicit_data_num || j < other._explicit_data_num) {
- if (i == explicit_data_num) {
- uint32_t n = other._explicit_data_num - j;
- memcpy(_explicit_data + k, other._explicit_data + j,
- n * sizeof(*_explicit_data));
- k += n;
- break;
- } else if (j == other._explicit_data_num) {
- uint32_t n = explicit_data_num - i;
- memcpy(_explicit_data + k, explicit_data + i, n * sizeof(*_explicit_data));
- k += n;
- break;
- } else {
- if (explicit_data[i] < other._explicit_data[j]) {
- _explicit_data[k++] = explicit_data[i++];
- } else if (explicit_data[i] > other._explicit_data[j]) {
- _explicit_data[k++] = other._explicit_data[j++];
- } else {
- _explicit_data[k++] = explicit_data[i++];
- j++;
- }
- }
- }
- _explicit_data_num = k;
- } else { //insert one by one
- int32_t n = other._explicit_data_num;
- const uint64_t* data = other._explicit_data;
- for (int32_t i = 0; i < n; ++i) {
- _explicit_data_insert(data[i]);
- }
- }
-
- if (_explicit_data_num > HLL_EXPLICIT_INT64_NUM) {
+ _hash_set.insert(other._hash_set.begin(), other._hash_set.end());
+ if (_hash_set.size() > HLL_EXPLICIT_INT64_NUM) {
_convert_explicit_to_register();
_type = HLL_DATA_FULL;
}
@@ -170,8 +124,8 @@ void HyperLogLog::merge(const HyperLogLog& other) {
case HLL_DATA_FULL: {
switch (other._type) {
case HLL_DATA_EXPLICIT:
- for (int32_t i = 0; i < other._explicit_data_num; ++i) {
- _update_registers(other._explicit_data[i]);
+ for (auto hash_value : other._hash_set) {
+ _update_registers(hash_value);
}
break;
case HLL_DATA_SPARSE:
@@ -192,7 +146,7 @@ size_t HyperLogLog::max_serialized_size() const {
default:
return 1;
case HLL_DATA_EXPLICIT:
- return 2 + _explicit_data_num * 8;
+ return 2 + _hash_set.size() * 8;
case HLL_DATA_SPARSE:
case HLL_DATA_FULL:
return 1 + HLL_REGISTERS_COUNT;
@@ -201,32 +155,24 @@ size_t HyperLogLog::max_serialized_size() const {
size_t HyperLogLog::serialize(uint8_t* dst) const {
uint8_t* ptr = dst;
-
switch (_type) {
case HLL_DATA_EMPTY:
default: {
// When the _type is unknown, which may not happen, we encode it as
// Empty HyperLogLog object.
*ptr++ = HLL_DATA_EMPTY;
-
break;
}
case HLL_DATA_EXPLICIT: {
- DCHECK(_explicit_data_num < HLL_EXPLICIT_INT64_NUM)
- << "Number of explicit elements(" << _explicit_data_num
+ DCHECK(_hash_set.size() <= HLL_EXPLICIT_INT64_NUM)
+ << "Number of explicit elements(" << _hash_set.size()
<< ") should be less or equal than " << HLL_EXPLICIT_INT64_NUM;
*ptr++ = _type;
- *ptr++ = (uint8_t)_explicit_data_num;
-
-#if __BYTE_ORDER == __LITTLE_ENDIAN
- memcpy(ptr, _explicit_data, _explicit_data_num * sizeof(*_explicit_data));
- ptr += _explicit_data_num * sizeof(*_explicit_data);
-#else
- for (int32_t i = 0; i < _explicit_data_num; ++i) {
- *(uint64_t*)ptr = (uint64_t)gbswap_64(_explicit_data[i]);
+ *ptr++ = (uint8_t)_hash_set.size();
+ for (auto hash_value : _hash_set) {
+ encode_fixed64_le(ptr, hash_value);
ptr += 8;
}
-#endif
break;
}
case HLL_DATA_SPARSE:
@@ -249,39 +195,15 @@ size_t HyperLogLog::serialize(uint8_t* dst) const {
encode_fixed32_le(ptr, num_non_zero_registers);
ptr += 4;
- for (uint32_t i = 0; i < HLL_REGISTERS_COUNT;) {
- if (*(uint32_t*)(&_registers[i]) == 0) {
- i += 4;
+ for (uint32_t i = 0; i < HLL_REGISTERS_COUNT; ++i) {
+ if (_registers[i] == 0) {
continue;
}
-
- if (UNLIKELY(_registers[i])) {
- encode_fixed16_le(ptr, i);
- ptr += 2; // 2 bytes: register index
- *ptr++ = _registers[i]; // 1 byte: register value
- }
- ++i;
-
- if (UNLIKELY(_registers[i])) {
- encode_fixed16_le(ptr, i);
- ptr += 2; // 2 bytes: register index
- *ptr++ = _registers[i]; // 1 byte: register value
- }
- ++i;
-
- if (UNLIKELY(_registers[i])) {
- encode_fixed16_le(ptr, i);
- ptr += 2; // 2 bytes: register index
- *ptr++ = _registers[i]; // 1 byte: register value
- }
- ++i;
-
- if (UNLIKELY(_registers[i])) {
- encode_fixed16_le(ptr, i);
- ptr += 2; // 2 bytes: register index
- *ptr++ = _registers[i]; // 1 byte: register value
- }
- ++i;
+ // 2 bytes: register index
+ // 1 byte: register value
+ encode_fixed16_le(ptr, i);
+ ptr += 2;
+ *ptr++ = _registers[i];
}
}
break;
@@ -355,24 +277,23 @@ bool HyperLogLog::deserialize(const Slice& slice) {
// 2: number of explicit values
// make sure that num_explicit is positive
uint8_t num_explicits = *ptr++;
- _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
// 3+: 8 bytes hash value
for (int i = 0; i < num_explicits; ++i) {
- _explicit_data_insert(decode_fixed64_le(ptr));
+ _hash_set.insert(decode_fixed64_le(ptr));
ptr += 8;
}
break;
}
case HLL_DATA_SPARSE: {
- _registers = new uint8_t[HLL_REGISTERS_COUNT]();
+ _registers = new uint8_t[HLL_REGISTERS_COUNT];
+ memset(_registers, 0, HLL_REGISTERS_COUNT);
// 2-5(4 byte): number of registers
uint32_t num_registers = decode_fixed32_le(ptr);
- uint16_t register_idx = 0;
ptr += 4;
for (uint32_t i = 0; i < num_registers; ++i) {
// 2 bytes: register index
// 1 byte: register value
- register_idx = decode_fixed16_le(ptr);
+ uint16_t register_idx = decode_fixed16_le(ptr);
ptr += 2;
_registers[register_idx] = *ptr++;
}
@@ -397,7 +318,7 @@ int64_t HyperLogLog::estimate_cardinality() const {
return 0;
}
if (_type == HLL_DATA_EXPLICIT) {
- return _explicit_data_num;
+ return _hash_set.size();
}
const int num_streams = HLL_REGISTERS_COUNT;
diff --git a/be/src/olap/hll.h b/be/src/olap/hll.h
index 1ae56e2254..dc157f886e 100644
--- a/be/src/olap/hll.h
+++ b/be/src/olap/hll.h
@@ -24,6 +24,11 @@
#include <map>
#include <set>
#include <string>
+#include <parallel_hashmap/phmap.h>
+
+#ifdef __x86_64__
+#include <immintrin.h>
+#endif
#include "gutil/macros.h"
@@ -34,7 +39,6 @@ struct Slice;
const static int HLL_COLUMN_PRECISION = 14;
const static int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION);
const static int HLL_EXPLICIT_INT64_NUM = 160;
-const static int HLL_EXPLICIT_INT64_NUM_DOUBLE = HLL_EXPLICIT_INT64_NUM * 2;
const static int HLL_SPARSE_THRESHOLD = 4096;
const static int HLL_REGISTERS_COUNT = 16 * 1024;
// maximum size in byte of serialized HLL: type(1) + registers (2^14)
@@ -83,10 +87,9 @@ class HyperLogLog {
public:
HyperLogLog() = default;
explicit HyperLogLog(uint64_t hash_value) : _type(HLL_DATA_EXPLICIT) {
- _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
- _explicit_data[0] = hash_value;
- _explicit_data_num = 1;
+ _hash_set.emplace(hash_value);
}
+ explicit HyperLogLog(const Slice& src);
HyperLogLog(const HyperLogLog& other) {
this->_type = other._type;
@@ -94,10 +97,7 @@ public:
case HLL_DATA_EMPTY:
break;
case HLL_DATA_EXPLICIT: {
- this->_explicit_data_num = other._explicit_data_num;
- _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
- memcpy(_explicit_data, other._explicit_data,
- sizeof(*_explicit_data) * _explicit_data_num);
+ this->_hash_set = other._hash_set;
break;
}
case HLL_DATA_SPARSE:
@@ -105,10 +105,10 @@ public:
_registers = new uint8_t[HLL_REGISTERS_COUNT];
memcpy(_registers, other._registers, HLL_REGISTERS_COUNT);
break;
+ }
default:
break;
}
- }
}
HyperLogLog(HyperLogLog&& other) {
@@ -117,10 +117,7 @@ public:
case HLL_DATA_EMPTY:
break;
case HLL_DATA_EXPLICIT: {
- this->_explicit_data_num = other._explicit_data_num;
- this->_explicit_data = other._explicit_data;
- other._explicit_data_num = 0;
- other._explicit_data = nullptr;
+ this->_hash_set = std::move(other._hash_set);
other._type = HLL_DATA_EMPTY;
break;
}
@@ -130,33 +127,25 @@ public:
other._registers = nullptr;
other._type = HLL_DATA_EMPTY;
break;
+ }
default:
break;
}
- }
}
HyperLogLog& operator=(HyperLogLog&& other) {
if (this != &other) {
- if (_registers) {
+ if (_registers != nullptr) {
delete[] _registers;
_registers = nullptr;
}
- if (_explicit_data) {
- delete[] _explicit_data;
- _explicit_data = nullptr;
- }
- _explicit_data_num = 0;
this->_type = other._type;
switch (other._type) {
case HLL_DATA_EMPTY:
break;
case HLL_DATA_EXPLICIT: {
- this->_explicit_data_num = other._explicit_data_num;
- this->_explicit_data = other._explicit_data;
- other._explicit_data_num = 0;
- other._explicit_data = nullptr;
+ this->_hash_set = std::move(other._hash_set);
other._type = HLL_DATA_EMPTY;
break;
}
@@ -166,35 +155,27 @@ public:
other._registers = nullptr;
other._type = HLL_DATA_EMPTY;
break;
+ }
default:
break;
}
- }
}
return *this;
}
HyperLogLog& operator=(const HyperLogLog& other) {
if (this != &other) {
- if (_registers) {
+ if (_registers != nullptr) {
delete[] _registers;
_registers = nullptr;
}
- if (_explicit_data) {
- delete[] _explicit_data;
- _explicit_data = nullptr;
- }
- _explicit_data_num = 0;
this->_type = other._type;
switch (other._type) {
case HLL_DATA_EMPTY:
break;
case HLL_DATA_EXPLICIT: {
- this->_explicit_data_num = other._explicit_data_num;
- _explicit_data = new uint64_t[HLL_EXPLICIT_INT64_NUM_DOUBLE];
- memcpy(_explicit_data, other._explicit_data,
- sizeof(*_explicit_data) * _explicit_data_num);
+ this->_hash_set = other._hash_set;
break;
}
case HLL_DATA_SPARSE:
@@ -202,25 +183,20 @@ public:
_registers = new uint8_t[HLL_REGISTERS_COUNT];
memcpy(_registers, other._registers, HLL_REGISTERS_COUNT);
break;
+ }
default:
break;
}
- }
}
return *this;
}
- explicit HyperLogLog(const Slice& src);
-
~HyperLogLog() { clear(); }
-
void clear() {
_type = HLL_DATA_EMPTY;
+ _hash_set.clear();
delete[] _registers;
_registers = nullptr;
- delete[] _explicit_data;
- _explicit_data = nullptr;
- _explicit_data_num = 0;
}
typedef uint8_t SetTypeValueType;
@@ -239,8 +215,10 @@ public:
size_t memory_consumed() const {
size_t size = sizeof(*this);
- if (_explicit_data) size += HLL_EXPLICIT_INT64_NUM_DOUBLE;
- if (_registers) size += HLL_REGISTERS_COUNT;
+ if (_type == HLL_DATA_EXPLICIT)
+ size += _hash_set.size() * sizeof(uint64_t);
+ else if (_type == HLL_DATA_SPARSE || _type == HLL_DATA_FULL)
+ size += HLL_REGISTERS_COUNT;
return size;
}
@@ -277,7 +255,7 @@ public:
case HLL_DATA_SPARSE:
case HLL_DATA_FULL: {
std::string str {"hash set size: "};
- str.append(std::to_string((size_t)_explicit_data_num));
+ str.append(std::to_string(_hash_set.size()));
str.append("\ncardinality:\t");
str.append(std::to_string(estimate_cardinality()));
str.append("\ntype:\t");
@@ -291,9 +269,7 @@ public:
private:
HllDataType _type = HLL_DATA_EMPTY;
-
- uint32_t _explicit_data_num = 0;
- uint64_t* _explicit_data = nullptr;
+ phmap::flat_hash_set<uint64_t> _hash_set;
// This field is much space consuming(HLL_REGISTERS_COUNT), we create
// it only when it is really needed.
@@ -312,40 +288,28 @@ private:
// make sure max first_one_bit is HLL_ZERO_COUNT_BITS + 1
hash_value |= ((uint64_t)1 << HLL_ZERO_COUNT_BITS);
uint8_t first_one_bit = __builtin_ctzl(hash_value) + 1;
- _registers[idx] = _registers[idx] > first_one_bit ? _registers[idx] : first_one_bit;
+ _registers[idx] = (_registers[idx] < first_one_bit ? first_one_bit : _registers[idx]);
}
// absorb other registers into this registers
- void _merge_registers(const uint8_t* other) {
- for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) {
- _registers[i] = _registers[i] < other[i] ? other[i] : _registers[i];
+ void _merge_registers(const uint8_t* other_registers) {
+#ifdef __AVX2__
+ int loop = HLL_REGISTERS_COUNT / 32; // 32 = 256/8
+ uint8_t* dst = _registers;
+ const uint8_t* src = other_registers;
+ for (int i = 0; i < loop; i++) {
+ __m256i xa = _mm256_loadu_si256((const __m256i*)dst);
+ __m256i xb = _mm256_loadu_si256((const __m256i*)src);
+ _mm256_storeu_si256((__m256i*)dst, _mm256_max_epu8(xa, xb));
+ src += 32;
+ dst += 32;
}
- }
-
- bool _explicit_data_insert(uint64_t data) {
- //find insert pos
- int32_t i = (int32_t)_explicit_data_num - 1;
- while (i >= 0) {
- if (_explicit_data[i] == data) {
- return false;
- } else if (_explicit_data[i] < data) {
- break;
- } else {
- --i;
- }
- }
-
- ++i; //now, i is the insert position
-
- size_t n = (_explicit_data_num - i) * sizeof(*_explicit_data);
- if (n) {
- memmove(_explicit_data + i + 1, _explicit_data + i, n);
+#else
+ for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) {
+ _registers[i] =
+ (_registers[i] < other_registers[i] ? other_registers[i] : _registers[i]);
}
-
- //insert data
- _explicit_data[i] = data;
- _explicit_data_num++;
- return true;
+#endif
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org