You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/30 09:11:42 UTC
[doris] branch master updated: [FIX](datatype) Implement hash func with array/map/struct type (#21334)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b7d6a70868 [FIX](datatype) Implement hash func with array/map/struct type (#21334)
b7d6a70868 is described below
commit b7d6a7086819c0e6f8198c68cd8e5e1fd84e64b1
Author: amory <wa...@selectdb.com>
AuthorDate: Fri Jun 30 17:11:35 2023 +0800
[FIX](datatype) Implement hash func with array/map/struct type (#21334)
we do not Implement any hash functions in array/map/struct column , so we use sql like this will make be core
select * from (
select
bdp.nc_num,
collect_list(distinct(bd.catalog_name)) as catalog_name,
material_qty
from
dataease.bu_delivery_product bdp
left join dataease.bu_trans_transfer btt on bdp.delivery_product_id = btt.delivery_product_id
left join dataease.bu_delivery bd on bdp.delivery_id = bd.delivery_id
where
bd.val_status in ('10', '20', '30', '90')
and bd.delivery_type in (0, 1, 2)
group by
nc_num,
material_qty
union
ALL
select
bdp.nc_num,
collect_list(distinct(bd.catalog_name)) as catalog_name,
material_qty
from
dataease.bu_trans_transfer btt
left join dataease.bu_delivery_product bdp on bdp.delivery_product_id = btt.delivery_product_id
left join dataease.bu_delivery bd on bdp.delivery_id = bd.delivery_id
where
bd.val_status in ('10', '20', '30', '90')
and bd.delivery_type in (0, 1, 2)
group by
nc_num,
material_qty
) aa;
core :
---
be/src/vec/columns/column.h | 16 ++-
be/src/vec/columns/column_array.cpp | 63 +++++++++
be/src/vec/columns/column_array.h | 14 ++
be/src/vec/columns/column_const.h | 14 ++
be/src/vec/columns/column_decimal.cpp | 31 +++--
be/src/vec/columns/column_decimal.h | 3 +
be/src/vec/columns/column_map.cpp | 68 ++++++++-
be/src/vec/columns/column_map.h | 13 ++
be/src/vec/columns/column_nullable.cpp | 18 +++
be/src/vec/columns/column_nullable.h | 9 +-
be/src/vec/columns/column_string.h | 12 ++
be/src/vec/columns/column_struct.cpp | 31 +++++
be/src/vec/columns/column_struct.h | 13 ++
be/src/vec/columns/column_vector.h | 18 +++
be/src/vec/sink/vdata_stream_sender.cpp | 15 +-
be/test/vec/columns/column_hash_func_test.cpp | 190 ++++++++++++++++++++++++++
16 files changed, 502 insertions(+), 26 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 00f1824b99..b291ba2443 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -365,7 +365,7 @@ public:
/// On subsequent calls of this method for sequence of column values of arbitrary types,
/// passed bytes to hash must identify sequence of values unambiguously.
virtual void update_hash_with_value(size_t n, SipHash& hash) const {
- LOG(FATAL) << "update_hash_with_value siphash not supported";
+ LOG(FATAL) << get_name() << " update_hash_with_value siphash not supported";
}
/// Update state of hash function with value of n elements to avoid the virtual function call
@@ -374,7 +374,7 @@ public:
/// do xxHash here, faster than other hash method
virtual void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data = nullptr) const {
- LOG(FATAL) << "update_hashes_with_value siphash not supported";
+ LOG(FATAL) << get_name() << " update_hashes_with_value siphash not supported";
}
/// Update state of hash function with value of n elements to avoid the virtual function call
@@ -383,7 +383,11 @@ public:
/// do xxHash here, faster than other sip hash
virtual void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data = nullptr) const {
- LOG(FATAL) << "update_hashes_with_value xxhash not supported";
+ LOG(FATAL) << get_name() << " update_hashes_with_value xxhash not supported";
+ }
+
+ virtual void update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ LOG(FATAL) << get_name() << " update_hash_with_value xxhash not supported";
}
/// Update state of crc32 hash function with value of n elements to avoid the virtual function call
@@ -391,7 +395,11 @@ public:
/// means all element need to do hash function, else only *null_data != 0 need to do hash func
virtual void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
const uint8_t* __restrict null_data = nullptr) const {
- LOG(FATAL) << "update_crcs_with_value not supported";
+ LOG(FATAL) << get_name() << "update_crcs_with_value not supported";
+ }
+
+ virtual void update_crc_with_value(size_t n, uint64_t& hash) const {
+ LOG(FATAL) << get_name() << " update_crc_with_value not supported";
}
/** Removes elements that don't match the filter.
diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp
index 4215bd36bd..c5e35e5bb9 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -271,6 +271,69 @@ void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const {
for (size_t i = 0; i < array_size; ++i) get_data().update_hash_with_value(offset + i, hash);
}
+void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+// for every array row calculate xxHash
+void ColumnArray::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ size_t elem_size = size_at(n);
+ size_t offset = offset_at(n);
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size),
+ hash);
+ for (auto i = 0; i < elem_size; ++i) {
+ get_data().update_xxHash_with_value(offset + i, hash);
+ }
+}
+
+// for every array row calculate crcHash
+void ColumnArray::update_crc_with_value(size_t n, uint64_t& crc) const {
+ size_t elem_size = size_at(n);
+ size_t offset = offset_at(n);
+
+ crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size),
+ crc);
+ for (auto i = 0; i < elem_size; ++i) {
+ get_data().update_crc_with_value(offset + i, crc);
+ }
+}
+
+void ColumnArray::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const {
+ auto s = size();
+ if (null_data) {
+ for (size_t i = 0; i < s; ++i) {
+ if (null_data[i] == 0) {
+ update_xxHash_with_value(i, hashes[i]);
+ }
+ }
+ } else {
+ for (size_t i = 0; i < s; ++i) {
+ update_xxHash_with_value(i, hashes[i]);
+ }
+ }
+}
+
+void ColumnArray::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+ const uint8_t* __restrict null_data) const {
+ auto s = hash.size();
+ DCHECK(s == size());
+
+ if (null_data) {
+ for (size_t i = 0; i < s; ++i) {
+ // every row
+ if (null_data[i] == 0) {
+ update_crc_with_value(i, hash[i]);
+ }
+ }
+ } else {
+ for (size_t i = 0; i < s; ++i) {
+ update_crc_with_value(i, hash[i]);
+ }
+ }
+}
+
void ColumnArray::insert(const Field& x) {
const Array& array = doris::vectorized::get<const Array&>(x);
size_t size = array.size();
diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h
index 4f08c269fb..2e1c96a2c5 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -139,6 +139,18 @@ public:
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const override;
+
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data = nullptr) const override;
+
+ void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+ const uint8_t* __restrict null_data = nullptr) const override;
+
void insert_range_from(const IColumn& src, size_t start, size_t length) override;
void insert(const Field& x) override;
void insert_from(const IColumn& src_, size_t n) override;
@@ -240,6 +252,8 @@ public:
ColumnPtr index(const IColumn& indexes, size_t limit) const override;
private:
+ // [[2,1,5,9,1], [1,2,4]] --> data column [2,1,5,9,1,1,2,4], offset[-1] = 0, offset[0] = 5, offset[1] = 8
+ // [[[2,1,5],[9,1]], [[1,2]]] --> data column [3 column array], offset[-1] = 0, offset[0] = 2, offset[1] = 3
WrappedPtr data;
WrappedPtr offsets;
diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h
index bb17f7eb04..feeb0608a2 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -152,6 +152,19 @@ public:
data->serialize_vec(keys, num_rows, max_row_byte_size);
}
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+ auto real_data = data->get_data_at(0);
+ if (real_data.data == nullptr) {
+ hash = HashUtil::xxHash64NullWithSeed(hash);
+ } else {
+ hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, hash);
+ }
+ }
+
+ void update_crc_with_value(size_t n, uint64_t& crc) const override {
+ get_data_column_ptr()->update_crc_with_value(n, crc);
+ }
+
void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows,
const uint8_t* null_map,
size_t max_row_byte_size) const override {
@@ -165,6 +178,7 @@ public:
void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const override;
+ // (TODO.Amory) here may not use column_const update hash, and PrimitiveType is not used.
void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type,
const uint8_t* __restrict null_data) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp
index 069f195c4a..e0b8fef056 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -137,6 +137,19 @@ void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
SIP_HASHES_FUNCTION_COLUMN_IMPL();
}
+template <typename T>
+void ColumnDecimal<T>::update_crc_with_value(size_t n, uint64_t& crc) const {
+ if constexpr (!IsDecimalV2<T>) {
+ crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+ } else {
+ const DecimalV2Value& dec_val = (const DecimalV2Value&)data[n];
+ int64_t int_val = dec_val.int_value();
+ int32_t frac_val = dec_val.frac_value();
+ crc = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), crc);
+ crc = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), crc);
+ };
+}
+
template <typename T>
void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type,
const uint8_t* __restrict null_data) const {
@@ -146,27 +159,23 @@ void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, Pri
if constexpr (!IsDecimalV2<T>) {
DO_CRC_HASHES_FUNCTION_COLUMN_IMPL()
} else {
- DCHECK(type == TYPE_DECIMALV2);
- auto decimalv2_do_crc = [&](size_t i) {
- const DecimalV2Value& dec_val = (const DecimalV2Value&)data[i];
- int64_t int_val = dec_val.int_value();
- int32_t frac_val = dec_val.frac_value();
- hashes[i] = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), hashes[i]);
- hashes[i] = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), hashes[i]);
- };
-
if (null_data == nullptr) {
for (size_t i = 0; i < s; i++) {
- decimalv2_do_crc(i);
+ update_crc_with_value(i, hashes[i]);
}
} else {
for (size_t i = 0; i < s; i++) {
- if (null_data[i] == 0) decimalv2_do_crc(i);
+ if (null_data[i] == 0) update_crc_with_value(i, hashes[i]);
}
}
}
}
+template <typename T>
+void ColumnDecimal<T>::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash);
+}
+
template <typename T>
void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data) const {
diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h
index f935c0c826..973f0bea68 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -184,6 +184,9 @@ public:
void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type,
const uint8_t* __restrict null_data) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
IColumn::Permutation& res) const override;
diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp
index 4f06ee4dda..1924e2ba46 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -238,15 +238,79 @@ const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
}
void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
- size_t array_size = size_at(n);
+ size_t kv_size = size_at(n);
size_t offset = offset_at(n);
- for (size_t i = 0; i < array_size; ++i) {
+ hash.update(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size));
+ for (size_t i = 0; i < kv_size; ++i) {
get_keys().update_hash_with_value(offset + i, hash);
get_values().update_hash_with_value(offset + i, hash);
}
}
+void ColumnMap::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+void ColumnMap::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ size_t kv_size = size_at(n);
+ size_t offset = offset_at(n);
+
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size),
+ hash);
+ for (auto i = 0; i < kv_size; ++i) {
+ get_keys().update_xxHash_with_value(offset + i, hash);
+ get_values().update_xxHash_with_value(offset + i, hash);
+ }
+}
+
+void ColumnMap::update_crc_with_value(size_t n, uint64_t& crc) const {
+ size_t kv_size = size_at(n);
+ size_t offset = offset_at(n);
+
+ crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size), crc);
+ for (size_t i = 0; i < kv_size; ++i) {
+ get_keys().update_crc_with_value(offset + i, crc);
+ get_values().update_crc_with_value(offset + i, crc);
+ }
+}
+
+void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const {
+ size_t s = size();
+ if (null_data) {
+ for (size_t i = 0; i < s; ++i) {
+ // every row
+ if (null_data[i] == 0) {
+ update_xxHash_with_value(i, hashes[i]);
+ }
+ }
+ } else {
+ for (size_t i = 0; i < s; ++i) {
+ update_xxHash_with_value(i, hashes[i]);
+ }
+ }
+}
+
+void ColumnMap::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+ const uint8_t* __restrict null_data) const {
+ auto s = hash.size();
+ DCHECK(s == size());
+
+ if (null_data) {
+ for (size_t i = 0; i < s; ++i) {
+ // every row
+ if (null_data[i] == 0) {
+ update_crc_with_value(i, hash[i]);
+ }
+ }
+ } else {
+ for (size_t i = 0; i < s; ++i) {
+ update_crc_with_value(i, hash[i]);
+ }
+ }
+}
+
void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) {
if (length == 0) {
return;
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 6cb83adf25..0d7bb2d0a7 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -39,6 +39,7 @@
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/common/cow.h"
+#include "vec/common/sip_hash.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
@@ -166,6 +167,18 @@ public:
size_t allocated_bytes() const override;
void protect() override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const override;
+
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data = nullptr) const override;
+
+ void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+ const uint8_t* __restrict null_data = nullptr) const override;
+
/******************** keys and values ***************/
const ColumnPtr& get_keys_ptr() const { return keys_column; }
ColumnPtr& get_keys_ptr() { return keys_column; }
diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp
index 42cda1d18d..ce5b68f3fb 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -65,6 +65,24 @@ MutableColumnPtr ColumnNullable::get_shrinked_column() {
get_null_map_column_ptr());
}
+void ColumnNullable::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
+ if (real_null_data[n] != 0) {
+ hash = HashUtil::xxHash64NullWithSeed(hash);
+ } else {
+ nested_column->update_xxHash_with_value(n, hash);
+ }
+}
+
+void ColumnNullable::update_crc_with_value(size_t n, uint64_t& crc) const {
+ auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
+ if (real_null_data[n] != 0) {
+ crc = HashUtil::zlib_crc_hash_null(crc);
+ } else {
+ nested_column->update_xxHash_with_value(n, crc);
+ }
+}
+
void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const {
if (is_null_at(n))
hash.update(0);
diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h
index d5ca7f844b..be9ba72399 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -98,11 +98,7 @@ public:
const char* get_family_name() const override { return "Nullable"; }
std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
MutableColumnPtr clone_resized(size_t size) const override;
- size_t size() const override {
- return nested_column->size(
-
- );
- }
+ size_t size() const override { return nested_column->size(); }
bool is_null_at(size_t n) const override {
return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
}
@@ -219,6 +215,9 @@ public:
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
int count_sz = -1) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
void update_hash_with_value(size_t n, SipHash& hash) const override;
void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const override;
diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h
index efd90fd844..703826cd24 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -398,6 +398,18 @@ public:
void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows,
const uint8_t* null_map) override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+ size_t string_size = size_at(n);
+ size_t offset = offset_at(n);
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&chars[offset]),
+ string_size, hash);
+ }
+
+ void update_crc_with_value(size_t n, uint64_t& crc) const override {
+ auto data_ref = get_data_at(n);
+ crc = HashUtil::zlib_crc_hash(data_ref.data, data_ref.size, crc);
+ }
+
void update_hash_with_value(size_t n, SipHash& hash) const override {
size_t string_size = size_at(n);
size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp
index 2a5a505fb8..58f5a4abaf 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -191,6 +191,37 @@ void ColumnStruct::update_hash_with_value(size_t n, SipHash& hash) const {
}
}
+void ColumnStruct::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+void ColumnStruct::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+ for (const auto& column : columns) {
+ column->update_xxHash_with_value(n, hash);
+ }
+}
+
+void ColumnStruct::update_crc_with_value(size_t n, uint64_t& crc) const {
+ for (const auto& column : columns) {
+ column->update_crc_with_value(n, crc);
+ }
+}
+
+void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const {
+ for (const auto& column : columns) {
+ column->update_hashes_with_value(hashes, null_data);
+ }
+}
+
+void ColumnStruct::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+ const uint8_t* __restrict null_data) const {
+ for (const auto& column : columns) {
+ column->update_crcs_with_value(hash, type, null_data);
+ }
+}
+
void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) {
const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h
index dfa8bd6f5d..3771d29e48 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -35,6 +35,7 @@
#include "vec/columns/column.h"
#include "vec/columns/column_impl.h"
#include "vec/common/cow.h"
+#include "vec/common/sip_hash.h"
#include "vec/common/string_ref.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
@@ -103,7 +104,19 @@ public:
void pop_back(size_t n) override;
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
+
void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+ void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const override;
+
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data = nullptr) const override;
+
+ void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+ const uint8_t* __restrict null_data = nullptr) const override;
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h
index c30796792e..67f2827c92 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -274,6 +274,24 @@ public:
const uint8_t* null_map,
size_t max_row_byte_size) const override;
+ void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+ hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash);
+ }
+
+ void update_crc_with_value(size_t n, uint64_t& crc) const override {
+ if constexpr (!std::is_same_v<T, Int64>) {
+ crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+ } else {
+ if (this->is_date_type() || this->is_datetime_type()) {
+ char buf[64];
+ const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[n];
+ auto len = date_val.to_buffer(buf);
+ crc = HashUtil::zlib_crc_hash(buf, len, crc);
+ } else {
+ crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+ }
+ }
+ }
void update_hash_with_value(size_t n, SipHash& hash) const override;
void update_hashes_with_value(std::vector<SipHash>& hashes,
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index df0bb396a3..b49c6ec92e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -40,6 +40,7 @@
#include "runtime/types.h"
#include "util/proto_util.h"
#include "util/telemetry/telemetry.h"
+#include "vec/columns/column_const.h"
#include "vec/common/sip_hash.h"
#include "vec/exprs/vexpr.h"
#include "vec/runtime/vdata_stream_mgr.h"
@@ -629,7 +630,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
std::vector<SipHash> siphashs(rows);
// result[j] means column index, i means rows index
for (int j = 0; j < result_size; ++j) {
- block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
+ // complex type most not implement get_data_at() method which column_const will call
+ unpack_if_const(block->get_by_position(result[j]).column)
+ .first->update_hashes_with_value(siphashs);
}
for (int i = 0; i < rows; i++) {
hashes[i] = siphashs[i].get64() % element_size;
@@ -638,7 +641,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
SCOPED_TIMER(_split_block_hash_compute_timer);
// result[j] means column index, i means rows index, here to calculate the xxhash value
for (int j = 0; j < result_size; ++j) {
- block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
+ // complex type most not implement get_data_at() method which column_const will call
+ unpack_if_const(block->get_by_position(result[j]).column)
+ .first->update_hashes_with_value(hashes);
}
for (int i = 0; i < rows; i++) {
@@ -654,8 +659,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block));
} else {
for (int j = 0; j < result_size; ++j) {
- block->get_by_position(result[j]).column->update_crcs_with_value(
- hash_vals, _partition_expr_ctxs[j]->root()->type().type);
+ // complex type most not implement get_data_at() method which column_const will call
+ unpack_if_const(block->get_by_position(result[j]).column)
+ .first->update_crcs_with_value(
+ hash_vals, _partition_expr_ctxs[j]->root()->type().type);
}
element_size = _channel_shared_ptrs.size();
for (int i = 0; i < rows; i++) {
diff --git a/be/test/vec/columns/column_hash_func_test.cpp b/be/test/vec/columns/column_hash_func_test.cpp
new file mode 100644
index 0000000000..0e409b4640
--- /dev/null
+++ b/be/test/vec/columns/column_hash_func_test.cpp
@@ -0,0 +1,190 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "vec/columns/column_const.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_struct.h"
+
+namespace doris::vectorized {
+
+DataTypes create_scala_data_types() {
+ DataTypePtr dt = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>());
+ DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDate>());
+ DataTypePtr dc = std::make_shared<DataTypeNullable>(vectorized::create_decimal(10, 2, false));
+ DataTypePtr dcv2 = std::make_shared<DataTypeNullable>(
+ std::make_shared<DataTypeDecimal<vectorized::Decimal128>>(27, 9));
+ DataTypePtr n3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+ DataTypePtr n1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>());
+ DataTypePtr s1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+
+ DataTypes dataTypes;
+ dataTypes.push_back(dt);
+ dataTypes.push_back(d);
+ dataTypes.push_back(dc);
+ dataTypes.push_back(dcv2);
+ dataTypes.push_back(n3);
+ dataTypes.push_back(n1);
+ dataTypes.push_back(s1);
+
+ return dataTypes;
+}
+
+TEST(HashFuncTest, ArrayTypeTest) {
+ DataTypes dataTypes = create_scala_data_types();
+
+ std::vector<uint64_t> sip_hash_vals(1);
+ std::vector<uint64_t> xx_hash_vals(1);
+ std::vector<uint64_t> crc_hash_vals(1);
+ auto* __restrict sip_hashes = sip_hash_vals.data();
+ auto* __restrict xx_hashes = xx_hash_vals.data();
+ auto* __restrict crc_hashes = crc_hash_vals.data();
+
+ for (auto d : dataTypes) {
+ DataTypePtr a = std::make_shared<DataTypeArray>(d);
+ ColumnPtr col_a = a->create_column_const_with_default_value(1);
+ // sipHash
+ std::vector<SipHash> siphashs(1);
+ col_a->update_hashes_with_value(siphashs);
+ EXPECT_NO_FATAL_FAILURE(col_a->update_hashes_with_value(siphashs));
+ sip_hashes[0] = siphashs[0].get64();
+ std::cout << sip_hashes[0] << std::endl;
+ // xxHash
+ EXPECT_NO_FATAL_FAILURE(col_a->update_hashes_with_value(xx_hashes));
+ std::cout << xx_hashes[0] << std::endl;
+ // crcHash
+ EXPECT_NO_FATAL_FAILURE(
+ col_a->update_crcs_with_value(crc_hash_vals, PrimitiveType::TYPE_ARRAY));
+ std::cout << crc_hashes[0] << std::endl;
+ }
+}
+
+TEST(HashFuncTest, ArrayCornerCaseTest) {
+ DataTypes dataTypes = create_scala_data_types();
+
+ DataTypePtr d = std::make_shared<DataTypeInt64>();
+ DataTypePtr a = std::make_shared<DataTypeArray>(d);
+ MutableColumnPtr array_mutable_col = a->create_column();
+ Array a1, a2;
+ a1.push_back(Int64(1));
+ a1.push_back(Int64(2));
+ a1.push_back(Int64(3));
+ array_mutable_col->insert(a1);
+ array_mutable_col->insert(a1);
+ a2.push_back(Int64(11));
+ a2.push_back(Int64(12));
+ a2.push_back(Int64(13));
+ array_mutable_col->insert(a2);
+
+ EXPECT_EQ(array_mutable_col->size(), 3);
+
+ std::vector<uint64_t> sip_hash_vals(3);
+ std::vector<uint64_t> xx_hash_vals(3);
+ std::vector<uint64_t> crc_hash_vals(3);
+ auto* __restrict sip_hashes = sip_hash_vals.data();
+ auto* __restrict xx_hashes = xx_hash_vals.data();
+ auto* __restrict crc_hashes = crc_hash_vals.data();
+
+ // sipHash
+ std::vector<SipHash> siphashs(3);
+ array_mutable_col->update_hashes_with_value(siphashs);
+ EXPECT_NO_FATAL_FAILURE(array_mutable_col->update_hashes_with_value(siphashs));
+ sip_hashes[0] = siphashs[0].get64();
+ sip_hashes[1] = siphashs[1].get64();
+ sip_hashes[2] = siphashs[2].get64();
+ EXPECT_EQ(sip_hashes[0], sip_hash_vals[1]);
+ EXPECT_TRUE(sip_hash_vals[0] != sip_hash_vals[2]);
+ // xxHash
+ EXPECT_NO_FATAL_FAILURE(array_mutable_col->update_hashes_with_value(xx_hashes));
+ EXPECT_EQ(xx_hashes[0], xx_hashes[1]);
+ EXPECT_TRUE(xx_hashes[0] != xx_hashes[2]);
+ // crcHash
+ EXPECT_NO_FATAL_FAILURE(
+ array_mutable_col->update_crcs_with_value(crc_hash_vals, PrimitiveType::TYPE_ARRAY));
+ EXPECT_EQ(crc_hashes[0], crc_hashes[1]);
+ EXPECT_TRUE(xx_hashes[0] != xx_hashes[2]);
+}
+
+TEST(HashFuncTest, MapTypeTest) {
+ DataTypes dataTypes = create_scala_data_types();
+
+ std::vector<uint64_t> sip_hash_vals(1);
+ std::vector<uint64_t> xx_hash_vals(1);
+ std::vector<uint64_t> crc_hash_vals(1);
+ auto* __restrict sip_hashes = sip_hash_vals.data();
+ auto* __restrict xx_hashes = xx_hash_vals.data();
+ auto* __restrict crc_hashes = crc_hash_vals.data();
+ // data_type_map
+ for (int i = 0; i < dataTypes.size() - 1; ++i) {
+ DataTypePtr a = std::make_shared<DataTypeMap>(dataTypes[i], dataTypes[i + 1]);
+ ColumnPtr col_a = a->create_column_const_with_default_value(1);
+ // sipHash
+ std::vector<SipHash> siphashs(1);
+ EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(siphashs));
+ sip_hashes[0] = siphashs[0].get64();
+ std::cout << sip_hashes[0] << std::endl;
+ // xxHash
+ EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(xx_hashes));
+ std::cout << xx_hashes[0] << std::endl;
+ // crcHash
+ EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_crcs_with_value(
+ crc_hash_vals, PrimitiveType::TYPE_MAP));
+ std::cout << crc_hashes[0] << std::endl;
+ }
+}
+
+TEST(HashFuncTest, StructTypeTest) {
+ DataTypes dataTypes = create_scala_data_types();
+
+ std::vector<uint64_t> sip_hash_vals(1);
+ std::vector<uint64_t> xx_hash_vals(1);
+ std::vector<uint64_t> crc_hash_vals(1);
+ auto* __restrict sip_hashes = sip_hash_vals.data();
+ auto* __restrict xx_hashes = xx_hash_vals.data();
+ auto* __restrict crc_hashes = crc_hash_vals.data();
+
+ // data_type_struct
+ DataTypePtr a = std::make_shared<DataTypeStruct>(dataTypes);
+ ColumnPtr col_a = a->create_column_const_with_default_value(1);
+ // sipHash
+ std::vector<SipHash> siphashs(1);
+ EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(siphashs));
+ sip_hashes[0] = siphashs[0].get64();
+ std::cout << sip_hashes[0] << std::endl;
+ // xxHash
+ EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(xx_hashes));
+ std::cout << xx_hashes[0] << std::endl;
+ // crcHash
+ EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_crcs_with_value(
+ crc_hash_vals, PrimitiveType::TYPE_STRUCT));
+ std::cout << crc_hashes[0] << std::endl;
+}
+
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org