You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/30 09:11:42 UTC

[doris] branch master updated: [FIX](datatype) Implement hash func with array/map/struct type (#21334)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b7d6a70868 [FIX](datatype) Implement hash func with array/map/struct type (#21334)
b7d6a70868 is described below

commit b7d6a7086819c0e6f8198c68cd8e5e1fd84e64b1
Author: amory <wa...@selectdb.com>
AuthorDate: Fri Jun 30 17:11:35 2023 +0800

    [FIX](datatype) Implement hash func with array/map/struct type (#21334)
    
    we do not Implement any hash functions in array/map/struct column , so we use sql like this will make be core
    
    select * from (
            select
                bdp.nc_num,
                collect_list(distinct(bd.catalog_name)) as catalog_name,
                material_qty
            from
                dataease.bu_delivery_product bdp
                left join dataease.bu_trans_transfer btt on bdp.delivery_product_id = btt.delivery_product_id
                left join dataease.bu_delivery bd on bdp.delivery_id = bd.delivery_id
            where
                bd.val_status in ('10', '20', '30', '90')
                and bd.delivery_type in (0, 1, 2)
            group by
                nc_num,
                material_qty
            union
            ALL
            select
                bdp.nc_num,
                collect_list(distinct(bd.catalog_name)) as catalog_name,
                material_qty
            from
                dataease.bu_trans_transfer btt
                left join dataease.bu_delivery_product bdp on bdp.delivery_product_id = btt.delivery_product_id
                left join dataease.bu_delivery bd on bdp.delivery_id = bd.delivery_id
            where
                bd.val_status in ('10', '20', '30', '90')
                and bd.delivery_type in (0, 1, 2)
            group by
                nc_num,
                material_qty
    ) aa;
    core :
---
 be/src/vec/columns/column.h                   |  16 ++-
 be/src/vec/columns/column_array.cpp           |  63 +++++++++
 be/src/vec/columns/column_array.h             |  14 ++
 be/src/vec/columns/column_const.h             |  14 ++
 be/src/vec/columns/column_decimal.cpp         |  31 +++--
 be/src/vec/columns/column_decimal.h           |   3 +
 be/src/vec/columns/column_map.cpp             |  68 ++++++++-
 be/src/vec/columns/column_map.h               |  13 ++
 be/src/vec/columns/column_nullable.cpp        |  18 +++
 be/src/vec/columns/column_nullable.h          |   9 +-
 be/src/vec/columns/column_string.h            |  12 ++
 be/src/vec/columns/column_struct.cpp          |  31 +++++
 be/src/vec/columns/column_struct.h            |  13 ++
 be/src/vec/columns/column_vector.h            |  18 +++
 be/src/vec/sink/vdata_stream_sender.cpp       |  15 +-
 be/test/vec/columns/column_hash_func_test.cpp | 190 ++++++++++++++++++++++++++
 16 files changed, 502 insertions(+), 26 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 00f1824b99..b291ba2443 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -365,7 +365,7 @@ public:
     /// On subsequent calls of this method for sequence of column values of arbitrary types,
     ///  passed bytes to hash must identify sequence of values unambiguously.
     virtual void update_hash_with_value(size_t n, SipHash& hash) const {
-        LOG(FATAL) << "update_hash_with_value siphash not supported";
+        LOG(FATAL) << get_name() << " update_hash_with_value siphash not supported";
     }
 
     /// Update state of hash function with value of n elements to avoid the virtual function call
@@ -374,7 +374,7 @@ public:
     /// do xxHash here, faster than other hash method
     virtual void update_hashes_with_value(std::vector<SipHash>& hashes,
                                           const uint8_t* __restrict null_data = nullptr) const {
-        LOG(FATAL) << "update_hashes_with_value siphash not supported";
+        LOG(FATAL) << get_name() << " update_hashes_with_value siphash not supported";
     }
 
     /// Update state of hash function with value of n elements to avoid the virtual function call
@@ -383,7 +383,11 @@ public:
     /// do xxHash here, faster than other sip hash
     virtual void update_hashes_with_value(uint64_t* __restrict hashes,
                                           const uint8_t* __restrict null_data = nullptr) const {
-        LOG(FATAL) << "update_hashes_with_value xxhash not supported";
+        LOG(FATAL) << get_name() << " update_hashes_with_value xxhash not supported";
+    }
+
+    virtual void update_xxHash_with_value(size_t n, uint64_t& hash) const {
+        LOG(FATAL) << get_name() << " update_hash_with_value xxhash not supported";
     }
 
     /// Update state of crc32 hash function with value of n elements to avoid the virtual function call
@@ -391,7 +395,11 @@ public:
     /// means all element need to do hash function, else only *null_data != 0 need to do hash func
     virtual void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
                                         const uint8_t* __restrict null_data = nullptr) const {
-        LOG(FATAL) << "update_crcs_with_value not supported";
+        LOG(FATAL) << get_name() << "update_crcs_with_value not supported";
+    }
+
+    virtual void update_crc_with_value(size_t n, uint64_t& hash) const {
+        LOG(FATAL) << get_name() << " update_crc_with_value not supported";
     }
 
     /** Removes elements that don't match the filter.
diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp
index 4215bd36bd..c5e35e5bb9 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -271,6 +271,69 @@ void ColumnArray::update_hash_with_value(size_t n, SipHash& hash) const {
     for (size_t i = 0; i < array_size; ++i) get_data().update_hash_with_value(offset + i, hash);
 }
 
+void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                           const uint8_t* __restrict null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+// for every array row calculate xxHash
+void ColumnArray::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    size_t elem_size = size_at(n);
+    size_t offset = offset_at(n);
+    hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size),
+                                      hash);
+    for (auto i = 0; i < elem_size; ++i) {
+        get_data().update_xxHash_with_value(offset + i, hash);
+    }
+}
+
+// for every array row calculate crcHash
+void ColumnArray::update_crc_with_value(size_t n, uint64_t& crc) const {
+    size_t elem_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&elem_size), sizeof(elem_size),
+                                  crc);
+    for (auto i = 0; i < elem_size; ++i) {
+        get_data().update_crc_with_value(offset + i, crc);
+    }
+}
+
+void ColumnArray::update_hashes_with_value(uint64_t* __restrict hashes,
+                                           const uint8_t* __restrict null_data) const {
+    auto s = size();
+    if (null_data) {
+        for (size_t i = 0; i < s; ++i) {
+            if (null_data[i] == 0) {
+                update_xxHash_with_value(i, hashes[i]);
+            }
+        }
+    } else {
+        for (size_t i = 0; i < s; ++i) {
+            update_xxHash_with_value(i, hashes[i]);
+        }
+    }
+}
+
+void ColumnArray::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+                                         const uint8_t* __restrict null_data) const {
+    auto s = hash.size();
+    DCHECK(s == size());
+
+    if (null_data) {
+        for (size_t i = 0; i < s; ++i) {
+            // every row
+            if (null_data[i] == 0) {
+                update_crc_with_value(i, hash[i]);
+            }
+        }
+    } else {
+        for (size_t i = 0; i < s; ++i) {
+            update_crc_with_value(i, hash[i]);
+        }
+    }
+}
+
 void ColumnArray::insert(const Field& x) {
     const Array& array = doris::vectorized::get<const Array&>(x);
     size_t size = array.size();
diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h
index 4f08c269fb..2e1c96a2c5 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -139,6 +139,18 @@ public:
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
     void update_hash_with_value(size_t n, SipHash& hash) const override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const override;
+
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data = nullptr) const override;
+
+    void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+                                const uint8_t* __restrict null_data = nullptr) const override;
+
     void insert_range_from(const IColumn& src, size_t start, size_t length) override;
     void insert(const Field& x) override;
     void insert_from(const IColumn& src_, size_t n) override;
@@ -240,6 +252,8 @@ public:
     ColumnPtr index(const IColumn& indexes, size_t limit) const override;
 
 private:
+    // [[2,1,5,9,1], [1,2,4]] --> data column [2,1,5,9,1,1,2,4], offset[-1] = 0, offset[0] = 5, offset[1] = 8
+    // [[[2,1,5],[9,1]], [[1,2]]] --> data column [3 column array], offset[-1] = 0, offset[0] = 2, offset[1] = 3
     WrappedPtr data;
     WrappedPtr offsets;
 
diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h
index bb17f7eb04..feeb0608a2 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -152,6 +152,19 @@ public:
         data->serialize_vec(keys, num_rows, max_row_byte_size);
     }
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+        auto real_data = data->get_data_at(0);
+        if (real_data.data == nullptr) {
+            hash = HashUtil::xxHash64NullWithSeed(hash);
+        } else {
+            hash = HashUtil::xxHash64WithSeed(real_data.data, real_data.size, hash);
+        }
+    }
+
+    void update_crc_with_value(size_t n, uint64_t& crc) const override {
+        get_data_column_ptr()->update_crc_with_value(n, crc);
+    }
+
     void serialize_vec_with_null_map(std::vector<StringRef>& keys, size_t num_rows,
                                      const uint8_t* null_map,
                                      size_t max_row_byte_size) const override {
@@ -165,6 +178,7 @@ public:
     void update_hashes_with_value(std::vector<SipHash>& hashes,
                                   const uint8_t* __restrict null_data) const override;
 
+    // (TODO.Amory) here may not use column_const update hash, and PrimitiveType is not used.
     void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type,
                                 const uint8_t* __restrict null_data) const override;
 
diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp
index 069f195c4a..e0b8fef056 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -137,6 +137,19 @@ void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
     SIP_HASHES_FUNCTION_COLUMN_IMPL();
 }
 
+template <typename T>
+void ColumnDecimal<T>::update_crc_with_value(size_t n, uint64_t& crc) const {
+    if constexpr (!IsDecimalV2<T>) {
+        crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+    } else {
+        const DecimalV2Value& dec_val = (const DecimalV2Value&)data[n];
+        int64_t int_val = dec_val.int_value();
+        int32_t frac_val = dec_val.frac_value();
+        crc = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), crc);
+        crc = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), crc);
+    };
+}
+
 template <typename T>
 void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type,
                                               const uint8_t* __restrict null_data) const {
@@ -146,27 +159,23 @@ void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes, Pri
     if constexpr (!IsDecimalV2<T>) {
         DO_CRC_HASHES_FUNCTION_COLUMN_IMPL()
     } else {
-        DCHECK(type == TYPE_DECIMALV2);
-        auto decimalv2_do_crc = [&](size_t i) {
-            const DecimalV2Value& dec_val = (const DecimalV2Value&)data[i];
-            int64_t int_val = dec_val.int_value();
-            int32_t frac_val = dec_val.frac_value();
-            hashes[i] = HashUtil::zlib_crc_hash(&int_val, sizeof(int_val), hashes[i]);
-            hashes[i] = HashUtil::zlib_crc_hash(&frac_val, sizeof(frac_val), hashes[i]);
-        };
-
         if (null_data == nullptr) {
             for (size_t i = 0; i < s; i++) {
-                decimalv2_do_crc(i);
+                update_crc_with_value(i, hashes[i]);
             }
         } else {
             for (size_t i = 0; i < s; i++) {
-                if (null_data[i] == 0) decimalv2_do_crc(i);
+                if (null_data[i] == 0) update_crc_with_value(i, hashes[i]);
             }
         }
     }
 }
 
+template <typename T>
+void ColumnDecimal<T>::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash);
+}
+
 template <typename T>
 void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes,
                                                 const uint8_t* __restrict null_data) const {
diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h
index f935c0c826..973f0bea68 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -184,6 +184,9 @@ public:
     void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType type,
                                 const uint8_t* __restrict null_data) const override;
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
     int compare_at(size_t n, size_t m, const IColumn& rhs_, int nan_direction_hint) const override;
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
                          IColumn::Permutation& res) const override;
diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp
index 4f06ee4dda..1924e2ba46 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -238,15 +238,79 @@ const char* ColumnMap::deserialize_and_insert_from_arena(const char* pos) {
 }
 
 void ColumnMap::update_hash_with_value(size_t n, SipHash& hash) const {
-    size_t array_size = size_at(n);
+    size_t kv_size = size_at(n);
     size_t offset = offset_at(n);
 
-    for (size_t i = 0; i < array_size; ++i) {
+    hash.update(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size));
+    for (size_t i = 0; i < kv_size; ++i) {
         get_keys().update_hash_with_value(offset + i, hash);
         get_values().update_hash_with_value(offset + i, hash);
     }
 }
 
+void ColumnMap::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                         const uint8_t* __restrict null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+void ColumnMap::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    size_t kv_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size),
+                                      hash);
+    for (auto i = 0; i < kv_size; ++i) {
+        get_keys().update_xxHash_with_value(offset + i, hash);
+        get_values().update_xxHash_with_value(offset + i, hash);
+    }
+}
+
+void ColumnMap::update_crc_with_value(size_t n, uint64_t& crc) const {
+    size_t kv_size = size_at(n);
+    size_t offset = offset_at(n);
+
+    crc = HashUtil::zlib_crc_hash(reinterpret_cast<const char*>(&kv_size), sizeof(kv_size), crc);
+    for (size_t i = 0; i < kv_size; ++i) {
+        get_keys().update_crc_with_value(offset + i, crc);
+        get_values().update_crc_with_value(offset + i, crc);
+    }
+}
+
+void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const {
+    size_t s = size();
+    if (null_data) {
+        for (size_t i = 0; i < s; ++i) {
+            // every row
+            if (null_data[i] == 0) {
+                update_xxHash_with_value(i, hashes[i]);
+            }
+        }
+    } else {
+        for (size_t i = 0; i < s; ++i) {
+            update_xxHash_with_value(i, hashes[i]);
+        }
+    }
+}
+
+void ColumnMap::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+                                       const uint8_t* __restrict null_data) const {
+    auto s = hash.size();
+    DCHECK(s == size());
+
+    if (null_data) {
+        for (size_t i = 0; i < s; ++i) {
+            // every row
+            if (null_data[i] == 0) {
+                update_crc_with_value(i, hash[i]);
+            }
+        }
+    } else {
+        for (size_t i = 0; i < s; ++i) {
+            update_crc_with_value(i, hash[i]);
+        }
+    }
+}
+
 void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) {
     if (length == 0) {
         return;
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 6cb83adf25..0d7bb2d0a7 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -39,6 +39,7 @@
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
 #include "vec/common/cow.h"
+#include "vec/common/sip_hash.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/field.h"
 #include "vec/core/types.h"
@@ -166,6 +167,18 @@ public:
     size_t allocated_bytes() const override;
     void protect() override;
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const override;
+
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data = nullptr) const override;
+
+    void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+                                const uint8_t* __restrict null_data = nullptr) const override;
+
     /******************** keys and values ***************/
     const ColumnPtr& get_keys_ptr() const { return keys_column; }
     ColumnPtr& get_keys_ptr() { return keys_column; }
diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp
index 42cda1d18d..ce5b68f3fb 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -65,6 +65,24 @@ MutableColumnPtr ColumnNullable::get_shrinked_column() {
                                   get_null_map_column_ptr());
 }
 
+void ColumnNullable::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
+    if (real_null_data[n] != 0) {
+        hash = HashUtil::xxHash64NullWithSeed(hash);
+    } else {
+        nested_column->update_xxHash_with_value(n, hash);
+    }
+}
+
+void ColumnNullable::update_crc_with_value(size_t n, uint64_t& crc) const {
+    auto* __restrict real_null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data().data();
+    if (real_null_data[n] != 0) {
+        crc = HashUtil::zlib_crc_hash_null(crc);
+    } else {
+        nested_column->update_xxHash_with_value(n, crc);
+    }
+}
+
 void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const {
     if (is_null_at(n))
         hash.update(0);
diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h
index d5ca7f844b..be9ba72399 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -98,11 +98,7 @@ public:
     const char* get_family_name() const override { return "Nullable"; }
     std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
     MutableColumnPtr clone_resized(size_t size) const override;
-    size_t size() const override {
-        return nested_column->size(
-
-        );
-    }
+    size_t size() const override { return nested_column->size(); }
     bool is_null_at(size_t n) const override {
         return assert_cast<const ColumnUInt8&>(*null_map).get_data()[n] != 0;
     }
@@ -219,6 +215,9 @@ public:
     ColumnPtr replicate(const Offsets& replicate_offsets) const override;
     void replicate(const uint32_t* counts, size_t target_size, IColumn& column, size_t begin = 0,
                    int count_sz = -1) const override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
     void update_hash_with_value(size_t n, SipHash& hash) const override;
     void update_hashes_with_value(std::vector<SipHash>& hashes,
                                   const uint8_t* __restrict null_data) const override;
diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h
index efd90fd844..703826cd24 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -398,6 +398,18 @@ public:
     void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const size_t num_rows,
                                        const uint8_t* null_map) override;
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+        size_t string_size = size_at(n);
+        size_t offset = offset_at(n);
+        hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&chars[offset]),
+                                          string_size, hash);
+    }
+
+    void update_crc_with_value(size_t n, uint64_t& crc) const override {
+        auto data_ref = get_data_at(n);
+        crc = HashUtil::zlib_crc_hash(data_ref.data, data_ref.size, crc);
+    }
+
     void update_hash_with_value(size_t n, SipHash& hash) const override {
         size_t string_size = size_at(n);
         size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp
index 2a5a505fb8..58f5a4abaf 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -191,6 +191,37 @@ void ColumnStruct::update_hash_with_value(size_t n, SipHash& hash) const {
     }
 }
 
+void ColumnStruct::update_hashes_with_value(std::vector<SipHash>& hashes,
+                                            const uint8_t* __restrict null_data) const {
+    SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
+void ColumnStruct::update_xxHash_with_value(size_t n, uint64_t& hash) const {
+    for (const auto& column : columns) {
+        column->update_xxHash_with_value(n, hash);
+    }
+}
+
+void ColumnStruct::update_crc_with_value(size_t n, uint64_t& crc) const {
+    for (const auto& column : columns) {
+        column->update_crc_with_value(n, crc);
+    }
+}
+
+void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes,
+                                            const uint8_t* __restrict null_data) const {
+    for (const auto& column : columns) {
+        column->update_hashes_with_value(hashes, null_data);
+    }
+}
+
+void ColumnStruct::update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+                                          const uint8_t* __restrict null_data) const {
+    for (const auto& column : columns) {
+        column->update_crcs_with_value(hash, type, null_data);
+    }
+}
+
 void ColumnStruct::insert_indices_from(const IColumn& src, const int* indices_begin,
                                        const int* indices_end) {
     const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h
index dfa8bd6f5d..3771d29e48 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -35,6 +35,7 @@
 #include "vec/columns/column.h"
 #include "vec/columns/column_impl.h"
 #include "vec/common/cow.h"
+#include "vec/common/sip_hash.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/field.h"
 #include "vec/core/types.h"
@@ -103,7 +104,19 @@ public:
     void pop_back(size_t n) override;
     StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*& begin) const override;
     const char* deserialize_and_insert_from_arena(const char* pos) override;
+
     void update_hash_with_value(size_t n, SipHash& hash) const override;
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override;
+    void update_crc_with_value(size_t n, uint64_t& crc) const override;
+
+    void update_hashes_with_value(std::vector<SipHash>& hashes,
+                                  const uint8_t* __restrict null_data) const override;
+
+    void update_hashes_with_value(uint64_t* __restrict hashes,
+                                  const uint8_t* __restrict null_data = nullptr) const override;
+
+    void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType type,
+                                const uint8_t* __restrict null_data = nullptr) const override;
 
     void insert_indices_from(const IColumn& src, const int* indices_begin,
                              const int* indices_end) override;
diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h
index c30796792e..67f2827c92 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -274,6 +274,24 @@ public:
                                      const uint8_t* null_map,
                                      size_t max_row_byte_size) const override;
 
+    void update_xxHash_with_value(size_t n, uint64_t& hash) const override {
+        hash = HashUtil::xxHash64WithSeed(reinterpret_cast<const char*>(&data[n]), sizeof(T), hash);
+    }
+
+    void update_crc_with_value(size_t n, uint64_t& crc) const override {
+        if constexpr (!std::is_same_v<T, Int64>) {
+            crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+        } else {
+            if (this->is_date_type() || this->is_datetime_type()) {
+                char buf[64];
+                const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[n];
+                auto len = date_val.to_buffer(buf);
+                crc = HashUtil::zlib_crc_hash(buf, len, crc);
+            } else {
+                crc = HashUtil::zlib_crc_hash(&data[n], sizeof(T), crc);
+            }
+        }
+    }
     void update_hash_with_value(size_t n, SipHash& hash) const override;
 
     void update_hashes_with_value(std::vector<SipHash>& hashes,
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index df0bb396a3..b49c6ec92e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -40,6 +40,7 @@
 #include "runtime/types.h"
 #include "util/proto_util.h"
 #include "util/telemetry/telemetry.h"
+#include "vec/columns/column_const.h"
 #include "vec/common/sip_hash.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/runtime/vdata_stream_mgr.h"
@@ -629,7 +630,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
                 std::vector<SipHash> siphashs(rows);
                 // result[j] means column index, i means rows index
                 for (int j = 0; j < result_size; ++j) {
-                    block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
+                    // complex type most not implement get_data_at() method which column_const will call
+                    unpack_if_const(block->get_by_position(result[j]).column)
+                            .first->update_hashes_with_value(siphashs);
                 }
                 for (int i = 0; i < rows; i++) {
                     hashes[i] = siphashs[i].get64() % element_size;
@@ -638,7 +641,9 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
                 SCOPED_TIMER(_split_block_hash_compute_timer);
                 // result[j] means column index, i means rows index, here to calculate the xxhash value
                 for (int j = 0; j < result_size; ++j) {
-                    block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
+                    // complex type most not implement get_data_at() method which column_const will call
+                    unpack_if_const(block->get_by_position(result[j]).column)
+                            .first->update_hashes_with_value(hashes);
                 }
 
                 for (int i = 0; i < rows; i++) {
@@ -654,8 +659,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
             RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block));
         } else {
             for (int j = 0; j < result_size; ++j) {
-                block->get_by_position(result[j]).column->update_crcs_with_value(
-                        hash_vals, _partition_expr_ctxs[j]->root()->type().type);
+                // complex type most not implement get_data_at() method which column_const will call
+                unpack_if_const(block->get_by_position(result[j]).column)
+                        .first->update_crcs_with_value(
+                                hash_vals, _partition_expr_ctxs[j]->root()->type().type);
             }
             element_size = _channel_shared_ptrs.size();
             for (int i = 0; i < rows; i++) {
diff --git a/be/test/vec/columns/column_hash_func_test.cpp b/be/test/vec/columns/column_hash_func_test.cpp
new file mode 100644
index 0000000000..0e409b4640
--- /dev/null
+++ b/be/test/vec/columns/column_hash_func_test.cpp
@@ -0,0 +1,190 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "vec/columns/column_const.h"
+#include "vec/core/field.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_map.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_struct.h"
+
+namespace doris::vectorized {
+
+DataTypes create_scala_data_types() {
+    DataTypePtr dt = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>());
+    DataTypePtr d = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDate>());
+    DataTypePtr dc = std::make_shared<DataTypeNullable>(vectorized::create_decimal(10, 2, false));
+    DataTypePtr dcv2 = std::make_shared<DataTypeNullable>(
+            std::make_shared<DataTypeDecimal<vectorized::Decimal128>>(27, 9));
+    DataTypePtr n3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt128>());
+    DataTypePtr n1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>());
+    DataTypePtr s1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
+
+    DataTypes dataTypes;
+    dataTypes.push_back(dt);
+    dataTypes.push_back(d);
+    dataTypes.push_back(dc);
+    dataTypes.push_back(dcv2);
+    dataTypes.push_back(n3);
+    dataTypes.push_back(n1);
+    dataTypes.push_back(s1);
+
+    return dataTypes;
+}
+
+TEST(HashFuncTest, ArrayTypeTest) {
+    DataTypes dataTypes = create_scala_data_types();
+
+    std::vector<uint64_t> sip_hash_vals(1);
+    std::vector<uint64_t> xx_hash_vals(1);
+    std::vector<uint64_t> crc_hash_vals(1);
+    auto* __restrict sip_hashes = sip_hash_vals.data();
+    auto* __restrict xx_hashes = xx_hash_vals.data();
+    auto* __restrict crc_hashes = crc_hash_vals.data();
+
+    for (auto d : dataTypes) {
+        DataTypePtr a = std::make_shared<DataTypeArray>(d);
+        ColumnPtr col_a = a->create_column_const_with_default_value(1);
+        // sipHash
+        std::vector<SipHash> siphashs(1);
+        col_a->update_hashes_with_value(siphashs);
+        EXPECT_NO_FATAL_FAILURE(col_a->update_hashes_with_value(siphashs));
+        sip_hashes[0] = siphashs[0].get64();
+        std::cout << sip_hashes[0] << std::endl;
+        // xxHash
+        EXPECT_NO_FATAL_FAILURE(col_a->update_hashes_with_value(xx_hashes));
+        std::cout << xx_hashes[0] << std::endl;
+        // crcHash
+        EXPECT_NO_FATAL_FAILURE(
+                col_a->update_crcs_with_value(crc_hash_vals, PrimitiveType::TYPE_ARRAY));
+        std::cout << crc_hashes[0] << std::endl;
+    }
+}
+
+TEST(HashFuncTest, ArrayCornerCaseTest) {
+    DataTypes dataTypes = create_scala_data_types();
+
+    DataTypePtr d = std::make_shared<DataTypeInt64>();
+    DataTypePtr a = std::make_shared<DataTypeArray>(d);
+    MutableColumnPtr array_mutable_col = a->create_column();
+    Array a1, a2;
+    a1.push_back(Int64(1));
+    a1.push_back(Int64(2));
+    a1.push_back(Int64(3));
+    array_mutable_col->insert(a1);
+    array_mutable_col->insert(a1);
+    a2.push_back(Int64(11));
+    a2.push_back(Int64(12));
+    a2.push_back(Int64(13));
+    array_mutable_col->insert(a2);
+
+    EXPECT_EQ(array_mutable_col->size(), 3);
+
+    std::vector<uint64_t> sip_hash_vals(3);
+    std::vector<uint64_t> xx_hash_vals(3);
+    std::vector<uint64_t> crc_hash_vals(3);
+    auto* __restrict sip_hashes = sip_hash_vals.data();
+    auto* __restrict xx_hashes = xx_hash_vals.data();
+    auto* __restrict crc_hashes = crc_hash_vals.data();
+
+    // sipHash
+    std::vector<SipHash> siphashs(3);
+    array_mutable_col->update_hashes_with_value(siphashs);
+    EXPECT_NO_FATAL_FAILURE(array_mutable_col->update_hashes_with_value(siphashs));
+    sip_hashes[0] = siphashs[0].get64();
+    sip_hashes[1] = siphashs[1].get64();
+    sip_hashes[2] = siphashs[2].get64();
+    EXPECT_EQ(sip_hashes[0], sip_hash_vals[1]);
+    EXPECT_TRUE(sip_hash_vals[0] != sip_hash_vals[2]);
+    // xxHash
+    EXPECT_NO_FATAL_FAILURE(array_mutable_col->update_hashes_with_value(xx_hashes));
+    EXPECT_EQ(xx_hashes[0], xx_hashes[1]);
+    EXPECT_TRUE(xx_hashes[0] != xx_hashes[2]);
+    // crcHash
+    EXPECT_NO_FATAL_FAILURE(
+            array_mutable_col->update_crcs_with_value(crc_hash_vals, PrimitiveType::TYPE_ARRAY));
+    EXPECT_EQ(crc_hashes[0], crc_hashes[1]);
+    EXPECT_TRUE(xx_hashes[0] != xx_hashes[2]);
+}
+
+TEST(HashFuncTest, MapTypeTest) {
+    DataTypes dataTypes = create_scala_data_types();
+
+    std::vector<uint64_t> sip_hash_vals(1);
+    std::vector<uint64_t> xx_hash_vals(1);
+    std::vector<uint64_t> crc_hash_vals(1);
+    auto* __restrict sip_hashes = sip_hash_vals.data();
+    auto* __restrict xx_hashes = xx_hash_vals.data();
+    auto* __restrict crc_hashes = crc_hash_vals.data();
+    // data_type_map
+    for (int i = 0; i < dataTypes.size() - 1; ++i) {
+        DataTypePtr a = std::make_shared<DataTypeMap>(dataTypes[i], dataTypes[i + 1]);
+        ColumnPtr col_a = a->create_column_const_with_default_value(1);
+        // sipHash
+        std::vector<SipHash> siphashs(1);
+        EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(siphashs));
+        sip_hashes[0] = siphashs[0].get64();
+        std::cout << sip_hashes[0] << std::endl;
+        // xxHash
+        EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(xx_hashes));
+        std::cout << xx_hashes[0] << std::endl;
+        // crcHash
+        EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_crcs_with_value(
+                crc_hash_vals, PrimitiveType::TYPE_MAP));
+        std::cout << crc_hashes[0] << std::endl;
+    }
+}
+
+TEST(HashFuncTest, StructTypeTest) {
+    DataTypes dataTypes = create_scala_data_types();
+
+    std::vector<uint64_t> sip_hash_vals(1);
+    std::vector<uint64_t> xx_hash_vals(1);
+    std::vector<uint64_t> crc_hash_vals(1);
+    auto* __restrict sip_hashes = sip_hash_vals.data();
+    auto* __restrict xx_hashes = xx_hash_vals.data();
+    auto* __restrict crc_hashes = crc_hash_vals.data();
+
+    // data_type_struct
+    DataTypePtr a = std::make_shared<DataTypeStruct>(dataTypes);
+    ColumnPtr col_a = a->create_column_const_with_default_value(1);
+    // sipHash
+    std::vector<SipHash> siphashs(1);
+    EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(siphashs));
+    sip_hashes[0] = siphashs[0].get64();
+    std::cout << sip_hashes[0] << std::endl;
+    // xxHash
+    EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_hashes_with_value(xx_hashes));
+    std::cout << xx_hashes[0] << std::endl;
+    // crcHash
+    EXPECT_NO_FATAL_FAILURE(unpack_if_const(col_a).first->update_crcs_with_value(
+            crc_hash_vals, PrimitiveType::TYPE_STRUCT));
+    std::cout << crc_hashes[0] << std::endl;
+}
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org