You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/26 00:41:43 UTC

[incubator-doris] branch master updated: [feature-wip][array-type] Support more sub types. (#9466)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a11a4ab99 [feature-wip][array-type] Support more sub types. (#9466)
2a11a4ab99 is described below

commit 2a11a4ab99206ebf9cf6c62c885088b8ead5bc05
Author: Adonis Ling <ad...@gmail.com>
AuthorDate: Thu May 26 08:41:34 2022 +0800

    [feature-wip][array-type] Support more sub types. (#9466)
    
    Please refer to #9465
---
 be/src/exprs/array_functions.cpp                   |   5 +-
 be/src/exprs/literal.cpp                           |  13 +-
 be/src/olap/aggregate_func.cpp                     |  16 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   2 +-
 be/src/olap/types.h                                |  15 +-
 be/src/runtime/collection_value.cpp                | 636 ++++++++++++---------
 be/src/runtime/collection_value.h                  | 176 ++++--
 be/src/runtime/mysql_result_writer.cpp             |  22 +-
 be/src/runtime/raw_value.cpp                       |   9 +-
 be/src/runtime/row_batch.cpp                       |   8 +-
 be/src/runtime/tuple.h                             |   2 +-
 be/src/udf/udf.h                                   |   2 +-
 be/src/util/array_parser.h                         |  65 ++-
 be/src/vec/core/block.cpp                          |  28 +-
 be/src/vec/olap/olap_data_convertor.cpp            | 266 ++++-----
 be/src/vec/olap/olap_data_convertor.h              |  85 +--
 be/src/vec/sink/mysql_result_writer.cpp            |  12 +
 be/test/exprs/array_functions_test.cpp             |   2 +-
 be/test/runtime/array_test.cpp                     | 417 ++++++++++----
 be/test/runtime/collection_value_test.cpp          |  13 +-
 be/test/testutil/array_utils.cpp                   |  26 +
 be/test/testutil/array_utils.h                     |   2 +-
 be/test/util/array_parser_test.cpp                 |  54 ++
 .../java/org/apache/doris/analysis/TypeDef.java    |   4 -
 .../java/org/apache/doris/catalog/ArrayType.java   |   3 +
 25 files changed, 1207 insertions(+), 676 deletions(-)

diff --git a/be/src/exprs/array_functions.cpp b/be/src/exprs/array_functions.cpp
index b9b7fcf8a4..be9f151b22 100644
--- a/be/src/exprs/array_functions.cpp
+++ b/be/src/exprs/array_functions.cpp
@@ -30,8 +30,9 @@ void ArrayFunctions::init() {}
         DCHECK_EQ(context->get_return_type().children.size(), 1);                   \
         CollectionValue v;                                                          \
         CollectionValue::init_collection(context, num_children, PRIMARY_TYPE, &v);  \
-        for (int i = 0; i < num_children; ++i) {                                    \
-            v.set(i, PRIMARY_TYPE, values + i);                                     \
+        auto iterator = v.iterator(PRIMARY_TYPE);                                   \
+        for (int i = 0; i < num_children; ++i, iterator.next()) {                   \
+            iterator.set(values + i);                                               \
         }                                                                           \
         CollectionVal ret;                                                          \
         v.to_collection_val(&ret);                                                  \
diff --git a/be/src/exprs/literal.cpp b/be/src/exprs/literal.cpp
index 5e1bbb9846..64401b7fe2 100644
--- a/be/src/exprs/literal.cpp
+++ b/be/src/exprs/literal.cpp
@@ -183,13 +183,14 @@ Status Literal::prepare(RuntimeState* state, const RowDescriptor& row_desc, Expr
     if (type().type == TYPE_ARRAY) {
         DCHECK_EQ(type().children.size(), 1) << "array children type not 1";
         // init array value
-        auto td = type().children.at(0).type;
-        RETURN_IF_ERROR(CollectionValue::init_collection(state->obj_pool(), get_num_children(), td,
-                                                         &_value.array_val));
+        auto child_type = type().children.at(0).type;
+        RETURN_IF_ERROR(CollectionValue::init_collection(state->obj_pool(), get_num_children(),
+                                                         child_type, &_value.array_val));
+        auto iterator = _value.array_val.iterator(child_type);
         // init every item
-        for (int i = 0; i < get_num_children(); ++i) {
-            Expr* children = get_child(i);
-            RETURN_IF_ERROR(_value.array_val.set(i, td, children->get_const_val(context)));
+        for (int i = 0; i < get_num_children() && iterator.has_next(); ++i, iterator.next()) {
+            Expr* child = get_child(i);
+            iterator.set(child->get_const_val(context));
         }
     }
 
diff --git a/be/src/olap/aggregate_func.cpp b/be/src/olap/aggregate_func.cpp
index 4d583d476b..781ab39082 100644
--- a/be/src/olap/aggregate_func.cpp
+++ b/be/src/olap/aggregate_func.cpp
@@ -105,6 +105,8 @@ AggregateFuncResolver::AggregateFuncResolver() {
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_STRING>();
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_BOOL>();
     // array types has sub type like array<int>  field type is array, subtype is int
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_BOOL>();
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
                           OLAP_FIELD_TYPE_TINYINT>();
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
@@ -116,9 +118,21 @@ AggregateFuncResolver::AggregateFuncResolver() {
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
                           OLAP_FIELD_TYPE_LARGEINT>();
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
-                          OLAP_FIELD_TYPE_VARCHAR>();
+                          OLAP_FIELD_TYPE_FLOAT>();
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_DOUBLE>();
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
                           OLAP_FIELD_TYPE_CHAR>();
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_VARCHAR>();
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_STRING>();
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_DATE>();
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_DATETIME>();
+    add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
+                          OLAP_FIELD_TYPE_DECIMAL>();
     add_aggregate_mapping<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_ARRAY,
                           OLAP_FIELD_TYPE_ARRAY>();
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 6963676f44..707e152902 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -127,7 +127,7 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
     }
 
     // convert column data from engine format to storage layer format
-    std::vector<vectorized::IOlapColumnDataAccessorSPtr> short_key_columns;
+    std::vector<vectorized::IOlapColumnDataAccessor*> short_key_columns;
     size_t num_key_columns = _tablet_schema->num_short_key_columns();
     for (size_t cid = 0; cid < _column_writers.size(); ++cid) {
         auto converted_result = _olap_data_convertor.convert_column_data(cid);
diff --git a/be/src/olap/types.h b/be/src/olap/types.h
index 2d486ed4be..9c31c44405 100644
--- a/be/src/olap/types.h
+++ b/be/src/olap/types.h
@@ -309,15 +309,15 @@ public:
     }
 
     void direct_copy(void* dest, const void* src) const override {
-        auto dest_value = reinterpret_cast<CollectionValue*>(dest);
+        auto dest_value = static_cast<CollectionValue*>(dest);
         // NOTICE: The address pointed by null_signs of the dest_value can NOT be modified here.
         auto base = reinterpret_cast<uint8_t*>(dest_value->mutable_null_signs());
         direct_copy(&base, dest, src);
     }
 
     void direct_copy(uint8_t** base, void* dest, const void* src) const {
-        auto dest_value = reinterpret_cast<CollectionValue*>(dest);
-        auto src_value = reinterpret_cast<const CollectionValue*>(src);
+        auto dest_value = static_cast<CollectionValue*>(dest);
+        auto src_value = static_cast<const CollectionValue*>(src);
 
         auto nulls_size = src_value->has_null() ? src_value->length() : 0;
         dest_value->set_data(src_value->length() ? (*base + nulls_size) : nullptr);
@@ -330,17 +330,22 @@ public:
                         src_value->length());
         }
         *base += nulls_size + src_value->length() * _item_type_info->size();
+
         // Direct copy item.
         if (_item_type_info->type() == OLAP_FIELD_TYPE_ARRAY) {
             for (uint32_t i = 0; i < src_value->length(); ++i) {
-                if (dest_value->is_null_at(i)) continue;
+                if (dest_value->is_null_at(i)) {
+                    continue;
+                }
                 dynamic_cast<const ArrayTypeInfo*>(_item_type_info.get())
                         ->direct_copy(base, (uint8_t*)(dest_value->mutable_data()) + i * _item_size,
                                       (uint8_t*)(src_value->data()) + i * _item_size);
             }
         } else {
             for (uint32_t i = 0; i < src_value->length(); ++i) {
-                if (dest_value->is_null_at(i)) continue;
+                if (dest_value->is_null_at(i)) {
+                    continue;
+                }
                 auto dest_address = (uint8_t*)(dest_value->mutable_data()) + i * _item_size;
                 auto src_address = (uint8_t*)(src_value->data()) + i * _item_size;
                 if (is_olap_string_type(_item_type_info->type())) {
diff --git a/be/src/runtime/collection_value.cpp b/be/src/runtime/collection_value.cpp
index f3f30e3cbe..138b33c0c9 100644
--- a/be/src/runtime/collection_value.cpp
+++ b/be/src/runtime/collection_value.cpp
@@ -17,59 +17,392 @@
 
 #include "runtime/collection_value.h"
 
-#include <functional>
-
-#include "common/logging.h"
 #include "common/utils.h"
-#include "runtime/descriptors.h"
-#include "util//mem_util.hpp"
+#include "runtime/raw_value.h"
+#include "runtime/types.h"
+#include "util/mem_util.hpp"
 
 namespace doris {
 
-using AllocateMemFunc = std::function<uint8_t*(size_t size)>;
-static Status init_collection(CollectionValue* value, const AllocateMemFunc& allocate,
-                              uint32_t size, PrimitiveType child_type);
+template <PrimitiveType>
+struct CollectionValueSubTypeTrait;
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_NULL> {
+    using CppType = int8_t; // slot size : 1
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_BOOLEAN> {
+    using CppType = bool;
+    using AnyValType = BooleanVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_TINYINT> {
+    using CppType = int8_t;
+    using AnyValType = TinyIntVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_SMALLINT> {
+    using CppType = int16_t;
+    using AnyValType = SmallIntVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_INT> {
+    using CppType = int32_t;
+    using AnyValType = IntVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_BIGINT> {
+    using CppType = int64_t;
+    using AnyValType = BigIntVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_LARGEINT> {
+    using CppType = __int128_t;
+    using AnyValType = LargeIntVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_FLOAT> {
+    using CppType = float;
+    using AnyValType = FloatVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_DOUBLE> {
+    using CppType = double;
+    using AnyValType = DoubleVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_CHAR> {
+    using CppType = StringValue;
+    using AnyValType = StringVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_VARCHAR> {
+    using CppType = StringValue;
+    using AnyValType = StringVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_STRING> {
+    using CppType = StringValue;
+    using AnyValType = StringVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_DATE> {
+    using CppType = uint24_t;
+    using AnyValType = DateTimeVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_DATETIME> {
+    using CppType = uint64_t;
+    using AnyValType = DateTimeVal;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_DECIMALV2> {
+    using CppType = decimal12_t;
+    using AnyValType = DecimalV2Val;
+};
+
+template <>
+struct CollectionValueSubTypeTrait<TYPE_ARRAY> {
+    using CppType = CollectionValue;
+    using AnyValType = CollectionVal;
+};
+
+struct ArrayIteratorFunctionsBase {};
+
+template <PrimitiveType type>
+struct GenericArrayIteratorFunctions : public ArrayIteratorFunctionsBase {
+    using CppType = typename CollectionValueSubTypeTrait<type>::CppType;
+    using AnyValType = typename CollectionValueSubTypeTrait<type>::AnyValType;
+
+    constexpr static int get_type_size() { return sizeof(CppType); }
+    static void shallow_set(void* item, const AnyVal* value) {
+        *static_cast<CppType*>(item) = static_cast<const AnyValType*>(value)->val;
+    }
+    static void shallow_get(AnyVal* value, const void* item) {
+        static_cast<AnyValType*>(value)->val = *static_cast<const CppType*>(item);
+    }
+    static void self_deep_copy(void* item, const TypeDescriptor& type_desc,
+                               const GenMemFootprintFunc& gen_mem_footprint, bool convert_ptrs) {}
+    static void deserialize(void* item, const char* tuple_data, const TypeDescriptor& type_desc) {}
+    static size_t get_byte_size(const void* item, const TypeDescriptor& type_desc) { return 0; }
+    static void raw_value_write(void* item, const void* value, const TypeDescriptor& type_desc,
+                                MemPool* pool) {
+        RawValue::write(value, item, type_desc, pool);
+    }
+};
 
-int sizeof_type(PrimitiveType type) {
-    switch (type) {
+template <PrimitiveType type>
+struct ArrayIteratorFunctions : public GenericArrayIteratorFunctions<type> {};
+
+template <PrimitiveType type>
+struct ArrayIteratorFunctionsForString : public GenericArrayIteratorFunctions<type> {
+    using CppType = StringValue;
+    using AnyValType = StringVal;
+
+    static void shallow_set(void* item, const AnyVal* value) {
+        const auto* src = static_cast<const AnyValType*>(value);
+        auto* dst = static_cast<CppType*>(item);
+        dst->ptr = convert_to<char*>(src->ptr);
+        dst->len = src->len;
+    }
+    static void shallow_get(AnyVal* value, const void* item) {
+        const auto* src = static_cast<const CppType*>(item);
+        auto* dst = static_cast<AnyValType*>(value);
+        dst->ptr = convert_to<uint8_t*>(src->ptr);
+        dst->len = src->len;
+    }
+    static void self_deep_copy(void* item, const TypeDescriptor&,
+                               const GenMemFootprintFunc& gen_mem_footprint, bool convert_ptrs) {
+        auto* string = static_cast<CppType*>(item);
+        if (!string->len) {
+            return;
+        }
+        MemFootprint footprint = gen_mem_footprint(string->len);
+        int64_t offset = footprint.first;
+        auto* copied_string = reinterpret_cast<char*>(footprint.second);
+        memory_copy(copied_string, string->ptr, string->len);
+        string->ptr = (convert_ptrs ? convert_to<char*>(offset) : copied_string);
+    }
+    static void deserialize(void* item, const char* tuple_data, const TypeDescriptor& type_desc) {
+        auto* string_value = static_cast<CppType*>(item);
+        if (string_value->len) {
+            int offset = convert_to<int>(string_value->ptr);
+            string_value->ptr = convert_to<char*>(tuple_data + offset);
+        }
+    }
+    static size_t get_byte_size(const void* item, const TypeDescriptor&) {
+        return static_cast<const CppType*>(item)->len;
+    }
+};
+
+template <>
+struct ArrayIteratorFunctions<TYPE_CHAR> : public ArrayIteratorFunctionsForString<TYPE_CHAR> {};
+template <>
+struct ArrayIteratorFunctions<TYPE_VARCHAR> : public ArrayIteratorFunctionsForString<TYPE_VARCHAR> {
+};
+template <>
+struct ArrayIteratorFunctions<TYPE_STRING> : public ArrayIteratorFunctionsForString<TYPE_STRING> {};
+
+template <>
+struct ArrayIteratorFunctions<TYPE_DATE> : public GenericArrayIteratorFunctions<TYPE_DATE> {
+    using GenericArrayIteratorFunctions<TYPE_DATE>::CppType;
+    using GenericArrayIteratorFunctions<TYPE_DATE>::AnyValType;
+
+    static void shallow_set(void* item, const AnyVal* value) {
+        const auto* src = static_cast<const AnyValType*>(value);
+        auto* dst = static_cast<CppType*>(item);
+        *dst = DateTimeValue::from_datetime_val(*src).to_olap_date();
+    }
+    static void shallow_get(AnyVal* value, const void* item) {
+        const auto* src = static_cast<const CppType*>(item);
+        auto* dst = static_cast<AnyValType*>(value);
+        DateTimeValue data;
+        data.from_olap_date(uint32_t(*src));
+        data.to_datetime_val(dst);
+    }
+    static void raw_value_write(void* item, const void* value, const TypeDescriptor& type_desc,
+                                MemPool* pool) {
+        DateTimeVal date_time_val;
+        shallow_get(&date_time_val, value);
+        shallow_set(item, &date_time_val);
+    }
+};
+template <>
+struct ArrayIteratorFunctions<TYPE_DATETIME> : public GenericArrayIteratorFunctions<TYPE_DATETIME> {
+    using GenericArrayIteratorFunctions<TYPE_DATETIME>::CppType;
+    using GenericArrayIteratorFunctions<TYPE_DATETIME>::AnyValType;
+
+    static void shallow_set(void* item, const AnyVal* value) {
+        const auto* src = static_cast<const AnyValType*>(value);
+        auto* dst = static_cast<CppType*>(item);
+        *dst = DateTimeValue::from_datetime_val(*src).to_olap_datetime();
+    }
+    static void shallow_get(AnyVal* value, const void* item) {
+        const auto* src = static_cast<const CppType*>(item);
+        auto* dst = static_cast<AnyValType*>(value);
+        DateTimeValue data;
+        data.from_olap_datetime(*src);
+        data.to_datetime_val(dst);
+    }
+    static void raw_value_write(void* item, const void* value, const TypeDescriptor& type_desc,
+                                MemPool* pool) {
+        DateTimeVal date_time_val;
+        shallow_get(&date_time_val, value);
+        shallow_set(item, &date_time_val);
+    }
+};
+
+template <>
+struct ArrayIteratorFunctions<TYPE_DECIMALV2>
+        : public GenericArrayIteratorFunctions<TYPE_DECIMALV2> {
+    using GenericArrayIteratorFunctions<TYPE_DECIMALV2>::CppType;
+    using GenericArrayIteratorFunctions<TYPE_DECIMALV2>::AnyValType;
+
+    static void shallow_set(void* item, const AnyVal* value) {
+        const auto* src = static_cast<const AnyValType*>(value);
+        auto* dst = static_cast<CppType*>(item);
+        auto decimal_value = DecimalV2Value::from_decimal_val(*src);
+        dst->integer = decimal_value.int_value();
+        dst->fraction = decimal_value.frac_value();
+    }
+    static void shallow_get(AnyVal* value, const void* item) {
+        const auto* src = static_cast<const CppType*>(item);
+        auto* dst = static_cast<AnyValType*>(value);
+        DecimalV2Value(src->integer, src->fraction).to_decimal_val(dst);
+    }
+    static void raw_value_write(void* item, const void* value, const TypeDescriptor& type_desc,
+                                MemPool* pool) {
+        DecimalV2Val decimal_val;
+        shallow_get(&decimal_val, value);
+        shallow_set(item, &decimal_val);
+    }
+};
+
+template <>
+struct ArrayIteratorFunctions<TYPE_ARRAY> : public GenericArrayIteratorFunctions<TYPE_ARRAY> {
+    using GenericArrayIteratorFunctions<TYPE_ARRAY>::CppType;
+    using GenericArrayIteratorFunctions<TYPE_ARRAY>::AnyValType;
+
+    static void shallow_set(void* item, const AnyVal* value) {
+        *static_cast<CppType*>(item) =
+                CppType::from_collection_val(*static_cast<const AnyValType*>(value));
+    }
+    static void shallow_get(AnyVal* value, const void* item) {
+        static_cast<const CppType*>(item)->to_collection_val(static_cast<AnyValType*>(value));
+    }
+    static void self_deep_copy(void* item, const TypeDescriptor& type_desc,
+                               const GenMemFootprintFunc& gen_mem_footprint, bool convert_ptrs) {
+        auto* collection_value = static_cast<CppType*>(item);
+        CollectionValue::deep_copy_collection(collection_value, type_desc.children[0],
+                                              gen_mem_footprint, convert_ptrs);
+    }
+    static void deserialize(void* item, const char* tuple_data, const TypeDescriptor& type_desc) {
+        CollectionValue::deserialize_collection(static_cast<CppType*>(item), tuple_data,
+                                                type_desc.children[0]);
+    }
+    static size_t get_byte_size(const void* item, const TypeDescriptor& type_desc) {
+        const auto* collection_value = static_cast<const CppType*>(item);
+        return collection_value->get_byte_size(type_desc.children[0]);
+    }
+};
+
+ArrayIterator CollectionValue::iterator(PrimitiveType child_type) {
+    return internal_iterator(child_type);
+}
+
+ArrayIterator CollectionValue::internal_iterator(PrimitiveType child_type) const {
+    switch (child_type) {
+    case TYPE_BOOLEAN:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_BOOLEAN>*>(nullptr));
     case TYPE_TINYINT:
-        return sizeof(int8_t);
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_TINYINT>*>(nullptr));
     case TYPE_SMALLINT:
-        return sizeof(int16_t);
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_SMALLINT>*>(nullptr));
     case TYPE_INT:
-        return sizeof(int32_t);
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_INT>*>(nullptr));
+    case TYPE_BIGINT:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_BIGINT>*>(nullptr));
+    case TYPE_LARGEINT:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_LARGEINT>*>(nullptr));
+    case TYPE_FLOAT:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_FLOAT>*>(nullptr));
+    case TYPE_DOUBLE:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_DOUBLE>*>(nullptr));
     case TYPE_CHAR:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_CHAR>*>(nullptr));
     case TYPE_VARCHAR:
-        return sizeof(StringValue);
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_VARCHAR>*>(nullptr));
+    case TYPE_STRING:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_STRING>*>(nullptr));
+    case TYPE_DATE:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_DATE>*>(nullptr));
+    case TYPE_DATETIME:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_DATETIME>*>(nullptr));
     case TYPE_ARRAY:
-        return sizeof(CollectionValue);
-    case TYPE_NULL:
-        return 0;
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_ARRAY>*>(nullptr));
+    case TYPE_DECIMALV2:
+        return ArrayIterator(const_cast<CollectionValue*>(this),
+                             static_cast<ArrayIteratorFunctions<TYPE_DECIMALV2>*>(nullptr));
     default:
-        DCHECK(false) << "Type not implemented: " << type;
-        break;
+        DCHECK(false) << "Invalid child type: " << child_type;
+        __builtin_unreachable();
     }
+}
 
-    return 0;
+const ArrayIterator CollectionValue::iterator(PrimitiveType child_type) const {
+    return internal_iterator(child_type);
 }
 
 Status type_check(PrimitiveType type) {
     switch (type) {
+    case TYPE_NULL:
+
+    case TYPE_BOOLEAN:
+
     case TYPE_TINYINT:
     case TYPE_SMALLINT:
     case TYPE_INT:
+    case TYPE_BIGINT:
+    case TYPE_LARGEINT:
+
+    case TYPE_FLOAT:
+    case TYPE_DOUBLE:
+
     case TYPE_CHAR:
     case TYPE_VARCHAR:
-    case TYPE_NULL:
+    case TYPE_STRING:
+
+    case TYPE_DATE:
+    case TYPE_DATETIME:
+
+    case TYPE_DECIMALV2:
+
     case TYPE_ARRAY:
         break;
     default:
         return Status::InvalidArgument(fmt::format("Type not implemented: {}", type));
     }
-
     return Status::OK();
 }
 
+int sizeof_type(PrimitiveType type) {
+    if (type_check(type).ok()) {
+        return CollectionValue().iterator(type).type_size();
+    } else {
+        DCHECK(false) << "Type not implemented: " << type;
+        return 0;
+    }
+}
+
 void CollectionValue::to_collection_val(CollectionVal* val) const {
     val->length = _length;
     val->data = _data;
@@ -92,7 +425,7 @@ void CollectionValue::copy_null_signs(const CollectionValue* other) {
     }
 }
 
-size_t CollectionValue::get_byte_size(const TypeDescriptor& type) const {
+size_t CollectionValue::get_byte_size(const TypeDescriptor& item_type) const {
     size_t result = 0;
     if (_length == 0) {
         return result;
@@ -100,44 +433,25 @@ size_t CollectionValue::get_byte_size(const TypeDescriptor& type) const {
     if (_has_null) {
         result += _length * sizeof(bool);
     }
-    const auto& item_type = type.children[0];
-    result += _length * item_type.get_slot_size();
-    if (item_type.is_string_type()) {
-        for (int i = 0; i < _length; ++i) {
-            if (is_null_at(i)) {
-                continue;
-            }
-            int item_offset = i * item_type.get_slot_size();
-            StringValue* item = reinterpret_cast<StringValue*>(((uint8_t*)_data) + item_offset);
-            result += item->len;
-        }
-    } else if (item_type.type == TYPE_ARRAY) {
-        for (int i = 0; i < _length; ++i) {
-            if (is_null_at(i)) {
-                continue;
-            }
-            int item_offset = i * item_type.get_slot_size();
-            CollectionValue* item =
-                    reinterpret_cast<CollectionValue*>(((uint8_t*)_data) + item_offset);
-            result += item->get_byte_size(item_type);
-        }
+    auto iterator = CollectionValue::iterator(item_type.type);
+    result += _length * iterator.type_size();
+
+    while (!iterator.is_type_fixed_width() && iterator.has_next()) {
+        result += iterator.get_byte_size(item_type);
+        iterator.next();
     }
     return result;
 }
 
-ArrayIterator CollectionValue::iterator(PrimitiveType children_type) const {
-    return ArrayIterator(children_type, this);
-}
-
 Status CollectionValue::init_collection(ObjectPool* pool, uint32_t size, PrimitiveType child_type,
                                         CollectionValue* value) {
-    return doris::init_collection(
+    return init_collection(
             value, [pool](size_t size) -> uint8_t* { return pool->add_array(new uint8_t[size]); },
             size, child_type);
 }
 
-static Status init_collection(CollectionValue* value, const AllocateMemFunc& allocate,
-                              uint32_t size, PrimitiveType child_type) {
+Status CollectionValue::init_collection(CollectionValue* value, const AllocateMemFunc& allocate,
+                                        uint32_t size, PrimitiveType child_type) {
     if (value == nullptr) {
         return Status::InvalidArgument("collection value is null");
     }
@@ -160,13 +474,13 @@ static Status init_collection(CollectionValue* value, const AllocateMemFunc& all
 
 Status CollectionValue::init_collection(MemPool* pool, uint32_t size, PrimitiveType child_type,
                                         CollectionValue* value) {
-    return doris::init_collection(
+    return init_collection(
             value, [pool](size_t size) { return pool->allocate(size); }, size, child_type);
 }
 
 Status CollectionValue::init_collection(FunctionContext* context, uint32_t size,
                                         PrimitiveType child_type, CollectionValue* value) {
-    return doris::init_collection(
+    return init_collection(
             value, [context](size_t size) { return context->allocate(size); }, size, child_type);
 }
 
@@ -186,7 +500,8 @@ void CollectionValue::deep_copy_collection(CollectionValue* shallow_copied_cv,
         return;
     }
 
-    int coll_byte_size = cv->length() * item_type.get_slot_size();
+    auto iterator = cv->iterator(item_type.type);
+    int coll_byte_size = cv->length() * iterator.type_size();
     int nulls_size = cv->has_null() ? cv->length() * sizeof(bool) : 0;
 
     MemFootprint footprint = gen_mem_footprint(coll_byte_size + nulls_size);
@@ -204,7 +519,10 @@ void CollectionValue::deep_copy_collection(CollectionValue* shallow_copied_cv,
     memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size);
     cv->set_data(coll_data + nulls_size);
 
-    deep_copy_items_in_collection(cv, coll_data, item_type, gen_mem_footprint, convert_ptrs);
+    while (!iterator.is_type_fixed_width() && iterator.has_next()) {
+        iterator.self_deep_copy(item_type, gen_mem_footprint, convert_ptrs);
+        iterator.next();
+    }
 
     if (convert_ptrs) {
         cv->set_data(convert_to<char*>(offset + nulls_size));
@@ -214,45 +532,8 @@ void CollectionValue::deep_copy_collection(CollectionValue* shallow_copied_cv,
     }
 }
 
-// Deep copy items in collection.
-// NOTICE: The CollectionValue* shallow_copied_cv must be initialized by calling memcpy function first (
-// copy data from origin collection value).
-void CollectionValue::deep_copy_items_in_collection(CollectionValue* shallow_copied_cv, char* base,
-                                                    const TypeDescriptor& item_type,
-                                                    const GenMemFootprintFunc& gen_mem_footprint,
-                                                    bool convert_ptrs) {
-    int nulls_size = shallow_copied_cv->has_null() ? shallow_copied_cv->length() : 0;
-    char* item_base = base + nulls_size;
-    if (item_type.is_string_type()) {
-        // when itemtype is string, copy every string item
-        for (int i = 0; i < shallow_copied_cv->length(); ++i) {
-            if (shallow_copied_cv->is_null_at(i)) {
-                continue;
-            }
-            char* item_offset = item_base + i * item_type.get_slot_size();
-            StringValue* dst_item_v = convert_to<StringValue*>(item_offset);
-            if (dst_item_v->len != 0) {
-                MemFootprint footprint = gen_mem_footprint(dst_item_v->len);
-                int64_t offset = footprint.first;
-                char* string_copy = reinterpret_cast<char*>(footprint.second);
-                memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
-                dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
-            }
-        }
-    } else if (item_type.type == TYPE_ARRAY) {
-        for (int i = 0; i < shallow_copied_cv->length(); ++i) {
-            if (shallow_copied_cv->is_null_at(i)) {
-                continue;
-            }
-            char* item_offset = item_base + i * item_type.get_slot_size();
-            CollectionValue* item_cv = convert_to<CollectionValue*>(item_offset);
-            deep_copy_collection(item_cv, item_type.children[0], gen_mem_footprint, convert_ptrs);
-        }
-    }
-}
-
 void CollectionValue::deserialize_collection(CollectionValue* cv, const char* tuple_data,
-                                             const TypeDescriptor& type) {
+                                             const TypeDescriptor& item_type) {
     if (cv->length() == 0) {
         new (cv) CollectionValue(cv->length());
         return;
@@ -264,171 +545,10 @@ void CollectionValue::deserialize_collection(CollectionValue* cv, const char* tu
         int null_offset = convert_to<int>(cv->null_signs());
         cv->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
     }
-
-    const TypeDescriptor& item_type = type.children[0];
-    if (item_type.is_string_type()) {
-        // copy every string item
-        for (size_t i = 0; i < cv->length(); ++i) {
-            if (cv->is_null_at(i)) {
-                continue;
-            }
-
-            StringValue* dst_item_v =
-                    convert_to<StringValue*>((uint8_t*)cv->data() + i * item_type.get_slot_size());
-
-            if (dst_item_v->len != 0) {
-                int offset = convert_to<int>(dst_item_v->ptr);
-                dst_item_v->ptr = convert_to<char*>(tuple_data + offset);
-            }
-        }
-    } else if (item_type.type == TYPE_ARRAY) {
-        for (size_t i = 0; i < cv->length(); ++i) {
-            if (cv->is_null_at(i)) {
-                continue;
-            }
-
-            CollectionValue* item_cv = convert_to<CollectionValue*>((uint8_t*)cv->data() +
-                                                                    i * item_type.get_slot_size());
-            deserialize_collection(item_cv, tuple_data, item_type);
-        }
-    }
-}
-
-Status CollectionValue::set(uint32_t i, PrimitiveType type, const AnyVal* value) {
-    RETURN_IF_ERROR(type_check(type));
-
-    ArrayIterator iter(type, this);
-    if (!iter.seek(i)) {
-        return Status::InvalidArgument("over of collection size");
-    }
-
-    if (value->is_null) {
-        *(_null_signs + i) = true;
-        _has_null = true;
-        return Status::OK();
-    } else {
-        *(_null_signs + i) = false;
-    }
-
-    switch (type) {
-    case TYPE_TINYINT:
-        *reinterpret_cast<int8_t*>(iter.value()) = reinterpret_cast<const TinyIntVal*>(value)->val;
-        break;
-    case TYPE_SMALLINT:
-        *reinterpret_cast<int16_t*>(iter.value()) =
-                reinterpret_cast<const SmallIntVal*>(value)->val;
-        break;
-    case TYPE_INT:
-        *reinterpret_cast<int32_t*>(iter.value()) = reinterpret_cast<const IntVal*>(value)->val;
-        break;
-    case TYPE_CHAR:
-    case TYPE_VARCHAR: {
-        const StringVal* src = reinterpret_cast<const StringVal*>(value);
-        StringValue* dest = reinterpret_cast<StringValue*>(iter.value());
-        dest->len = src->len;
-        dest->ptr = (char*)src->ptr;
-        break;
-    }
-    case TYPE_ARRAY: {
-        const CollectionVal* src = reinterpret_cast<const CollectionVal*>(value);
-        CollectionValue* dest = reinterpret_cast<CollectionValue*>(iter.value());
-        *dest = CollectionValue::from_collection_val(*src);
-        break;
-    }
-    default:
-        DCHECK(false) << "Type not implemented: " << type;
-        return Status::InvalidArgument("Type not implemented");
-    }
-
-    return Status::OK();
-}
-
-/**
- * ----------- Array Iterator -------- 
- */
-ArrayIterator::ArrayIterator(PrimitiveType children_type, const CollectionValue* data)
-        : _offset(0), _type(children_type), _data(data) {
-    _type_size = sizeof_type(children_type);
-}
-
-void* ArrayIterator::value() {
-    if (is_null()) {
-        return nullptr;
-    }
-    return ((char*)_data->_data) + _offset * _type_size;
-}
-
-bool ArrayIterator::is_null() {
-    return _data->is_null_at(_offset);
-}
-
-void ArrayIterator::value(AnyVal* dest) {
-    if (is_null()) {
-        dest->is_null = true;
-        return;
-    }
-    dest->is_null = false;
-    switch (_type) {
-    case TYPE_BOOLEAN:
-        reinterpret_cast<BooleanVal*>(dest)->val = *reinterpret_cast<const bool*>(value());
-        break;
-
-    case TYPE_TINYINT:
-        reinterpret_cast<TinyIntVal*>(dest)->val = *reinterpret_cast<const int8_t*>(value());
-        break;
-
-    case TYPE_SMALLINT:
-        reinterpret_cast<SmallIntVal*>(dest)->val = *reinterpret_cast<const int16_t*>(value());
-        break;
-
-    case TYPE_INT:
-        reinterpret_cast<IntVal*>(dest)->val = *reinterpret_cast<const int32_t*>(value());
-        break;
-
-    case TYPE_BIGINT:
-        reinterpret_cast<BigIntVal*>(dest)->val = *reinterpret_cast<const int64_t*>(value());
-        break;
-
-    case TYPE_FLOAT:
-        reinterpret_cast<FloatVal*>(dest)->val = *reinterpret_cast<const float*>(value());
-        break;
-
-    case TYPE_DOUBLE:
-        reinterpret_cast<DoubleVal*>(dest)->val = *reinterpret_cast<const double*>(value());
-        break;
-    case TYPE_HLL:
-    case TYPE_CHAR:
-    case TYPE_VARCHAR: {
-        const StringValue* str_value = reinterpret_cast<const StringValue*>(value());
-        reinterpret_cast<StringVal*>(dest)->len = str_value->len;
-        reinterpret_cast<StringVal*>(dest)->ptr = (uint8_t*)(str_value->ptr);
-        break;
-    }
-    case TYPE_DATE:
-    case TYPE_DATETIME: {
-        const DateTimeValue* date_time_value = reinterpret_cast<const DateTimeValue*>(value());
-        reinterpret_cast<DateTimeVal*>(dest)->packed_time = date_time_value->to_int64();
-        reinterpret_cast<DateTimeVal*>(dest)->type = date_time_value->type();
-        break;
-    }
-
-    case TYPE_DECIMALV2:
-        reinterpret_cast<DecimalV2Val*>(dest)->val =
-                reinterpret_cast<const PackedInt128*>(value())->value;
-        break;
-
-    case TYPE_LARGEINT:
-        reinterpret_cast<LargeIntVal*>(dest)->val =
-                reinterpret_cast<const PackedInt128*>(value())->value;
-        break;
-
-    case TYPE_ARRAY:
-        reinterpret_cast<const CollectionValue*>(value())->to_collection_val(
-                reinterpret_cast<CollectionVal*>(dest));
-        break;
-
-    default:
-        DCHECK(false) << "bad type: " << _type;
+    auto iterator = cv->iterator(item_type.type);
+    while (!iterator.is_type_fixed_width() && iterator.has_next()) {
+        iterator.deserialize(tuple_data, item_type);
+        iterator.next();
     }
 }
 } // namespace doris
diff --git a/be/src/runtime/collection_value.h b/be/src/runtime/collection_value.h
index 3ba76de3fa..c3c71f2c49 100644
--- a/be/src/runtime/collection_value.h
+++ b/be/src/runtime/collection_value.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <type_traits>
+
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "runtime/mem_pool.h"
@@ -31,19 +33,29 @@ using MemFootprint = std::pair<int64_t, uint8_t*>;
 using GenMemFootprintFunc = std::function<MemFootprint(int size)>;
 
 struct TypeDescriptor;
+struct ArrayIteratorFunctionsBase;
 class ArrayIterator;
 
+template <PrimitiveType type>
+struct ArrayIteratorFunctions;
+template <typename T>
+inline constexpr std::enable_if_t<std::is_base_of_v<ArrayIteratorFunctionsBase, T>, bool>
+        IsTypeFixedWidth = true;
+
+template <>
+inline constexpr bool IsTypeFixedWidth<ArrayIteratorFunctions<TYPE_CHAR>> = false;
+template <>
+inline constexpr bool IsTypeFixedWidth<ArrayIteratorFunctions<TYPE_VARCHAR>> = false;
+template <>
+inline constexpr bool IsTypeFixedWidth<ArrayIteratorFunctions<TYPE_STRING>> = false;
+template <>
+inline constexpr bool IsTypeFixedWidth<ArrayIteratorFunctions<TYPE_ARRAY>> = false;
+
 /**
  * The format of array-typed slot.
- * The array's sub-element type just support: 
- * - INT32
- * - CHAR
- * - VARCHAR
- * - NULL
- * 
- * A new array need initialization memory before used
+ * A new array needs to be initialized before using it.
  */
-struct CollectionValue {
+class CollectionValue {
 public:
     CollectionValue() = default;
 
@@ -71,15 +83,10 @@ public:
 
     void copy_null_signs(const CollectionValue* other);
 
-    size_t get_byte_size(const TypeDescriptor& type) const;
-
-    ArrayIterator iterator(PrimitiveType children_type) const;
+    size_t get_byte_size(const TypeDescriptor& item_type) const;
 
-    /**
-     * just shallow copy sub-elment value
-     * For special type, will shared actual value's memory, like StringValue.
-     */
-    Status set(uint32_t i, PrimitiveType type, const AnyVal* value);
+    ArrayIterator iterator(PrimitiveType child_type);
+    const ArrayIterator iterator(PrimitiveType child_type) const;
 
     /**
      * init collection, will alloc (children Type's size + 1) * (children Nums) memory  
@@ -103,16 +110,8 @@ public:
                                      const GenMemFootprintFunc& gen_mem_footprint,
                                      bool convert_ptrs);
 
-    // Deep copy items in collection.
-    // NOTICE: The CollectionValue* shallow_copied_cv must be initialized by calling memcpy function first (
-    // copy data from origin collection value).
-    static void deep_copy_items_in_collection(CollectionValue* shallow_copied_cv, char* base,
-                                              const TypeDescriptor& item_type,
-                                              const GenMemFootprintFunc& gen_mem_footprint,
-                                              bool convert_ptrs);
-
     static void deserialize_collection(CollectionValue* cv, const char* tuple_data,
-                                       const TypeDescriptor& type);
+                                       const TypeDescriptor& item_type);
 
     const void* data() const { return _data; }
     bool has_null() const { return _has_null; }
@@ -124,7 +123,13 @@ public:
     void set_data(void* data) { _data = data; }
     void set_null_signs(bool* null_signs) { _null_signs = null_signs; }
 
-public:
+private:
+    using AllocateMemFunc = std::function<uint8_t*(size_t size)>;
+    static Status init_collection(CollectionValue* value, const AllocateMemFunc& allocate,
+                                  uint32_t size, PrimitiveType child_type);
+    ArrayIterator internal_iterator(PrimitiveType child_type) const;
+
+private:
     // child column data
     void* _data;
     uint32_t _length;
@@ -137,45 +142,110 @@ public:
     friend ArrayIterator;
 };
 
-/**
- * Array's Iterator, support read array by special type
- */
 class ArrayIterator {
-private:
-    ArrayIterator(PrimitiveType children_type, const CollectionValue* data);
-
 public:
-    bool seek(uint32_t n) {
-        if (n >= _data->size()) {
+    int type_size() const { return _type_size; }
+    bool is_type_fixed_width() const { return _is_type_fixed_width; }
+
+    bool has_next() const { return _offset < _collection_value->size(); }
+    bool next() const {
+        if (has_next()) {
+            ++_offset;
+            return true;
+        }
+        return false;
+    }
+    bool seek(uint32_t n) const {
+        if (n >= _collection_value->size()) {
             return false;
         }
-
         _offset = n;
         return true;
     }
-
-    bool has_next() { return _offset < _data->size(); }
-
-    bool next() {
-        if (_offset < _data->size()) {
-            _offset++;
-            return true;
+    bool is_null() const { return _collection_value->is_null_at(_offset); }
+    const void* get() const {
+        if (is_null()) {
+            return nullptr;
         }
-
-        return false;
+        return reinterpret_cast<const uint8_t*>(_collection_value->data()) + _offset * _type_size;
+    }
+    void* get() {
+        if (is_null()) {
+            return nullptr;
+        }
+        return reinterpret_cast<uint8_t*>(_collection_value->mutable_data()) + _offset * _type_size;
+    }
+    void get(AnyVal* value) const {
+        if (is_null()) {
+            value->is_null = true;
+            return;
+        }
+        value->is_null = false;
+        _shallow_get(value, get());
+    }
+    void set(const AnyVal* value) {
+        if (_collection_value->mutable_null_signs()) {
+            _collection_value->mutable_null_signs()[_offset] = value->is_null;
+        }
+        if (value->is_null) {
+            _collection_value->set_has_null(true);
+        } else {
+            _shallow_set(get(), value);
+        }
+    }
+    void self_deep_copy(const TypeDescriptor& type_desc,
+                        const GenMemFootprintFunc& gen_mem_footprint, bool convert_ptrs) {
+        if (is_null()) {
+            return;
+        }
+        _self_deep_copy(get(), type_desc, gen_mem_footprint, convert_ptrs);
+    }
+    void deserialize(const char* tuple_data, const TypeDescriptor& type_desc) {
+        if (is_null()) {
+            return;
+        }
+        _deserialize(get(), tuple_data, type_desc);
+    }
+    size_t get_byte_size(const TypeDescriptor& type) const {
+        if (is_null()) {
+            return 0;
+        }
+        return _get_byte_size(get(), type);
+    }
+    void raw_value_write(const void* value, const TypeDescriptor& type_desc, MemPool* pool) {
+        if (is_null()) {
+            return;
+        }
+        return _raw_value_write(get(), value, type_desc, pool);
     }
 
-    bool is_null();
-
-    void* value();
-
-    void value(AnyVal* dest);
+private:
+    template <typename T,
+              typename = std::enable_if_t<std::is_base_of_v<ArrayIteratorFunctionsBase, T>>>
+    ArrayIterator(CollectionValue* data, const T*)
+            : _shallow_get(T::shallow_get),
+              _shallow_set(T::shallow_set),
+              _self_deep_copy(T::self_deep_copy),
+              _deserialize(T::deserialize),
+              _get_byte_size(T::get_byte_size),
+              _raw_value_write(T::raw_value_write),
+              _collection_value(data),
+              _offset(0),
+              _type_size(T::get_type_size()),
+              _is_type_fixed_width(IsTypeFixedWidth<T>) {}
+    void (*_shallow_get)(AnyVal*, const void*);
+    void (*_shallow_set)(void*, const AnyVal*);
+    void (*_self_deep_copy)(void*, const TypeDescriptor&, const GenMemFootprintFunc&, bool);
+    void (*_deserialize)(void*, const char*, const TypeDescriptor&);
+    size_t (*_get_byte_size)(const void* item, const TypeDescriptor&);
+    void (*_raw_value_write)(void* item, const void* value, const TypeDescriptor& type_desc,
+                             MemPool* pool);
 
 private:
-    size_t _offset;
-    int _type_size;
-    const PrimitiveType _type;
-    const CollectionValue* _data;
+    CollectionValue* _collection_value;
+    mutable uint32_t _offset;
+    const int _type_size;
+    const bool _is_type_fixed_width;
 
     friend CollectionValue;
 };
diff --git a/be/src/runtime/mysql_result_writer.cpp b/be/src/runtime/mysql_result_writer.cpp
index 83e12660eb..f7c5f43930 100644
--- a/be/src/runtime/mysql_result_writer.cpp
+++ b/be/src/runtime/mysql_result_writer.cpp
@@ -161,10 +161,10 @@ int MysqlResultWriter::_add_row_value(int index, const TypeDescriptor& type, voi
     }
 
     case TYPE_ARRAY: {
-        auto children_type = type.children[0];
+        auto child_type = type.children[0];
         auto array_value = (const CollectionValue*)(item);
 
-        ArrayIterator iter = array_value->iterator(children_type.type);
+        ArrayIterator iter = array_value->iterator(child_type.type);
 
         _row_buffer->open_dynamic_mode();
 
@@ -175,15 +175,25 @@ int MysqlResultWriter::_add_row_value(int index, const TypeDescriptor& type, voi
             if (begin != 0) {
                 buf_ret = _row_buffer->push_string(", ", 2);
             }
-            if (!iter.value()) {
+            if (!iter.get()) {
                 buf_ret = _row_buffer->push_string("NULL", 4);
             } else {
-                if (children_type == TYPE_CHAR || children_type == TYPE_VARCHAR) {
+                if (child_type.is_string_type()) {
                     buf_ret = _row_buffer->push_string("'", 1);
-                    buf_ret = _add_row_value(index, children_type, iter.value());
+                    buf_ret = _add_row_value(index, child_type, iter.get());
                     buf_ret = _row_buffer->push_string("'", 1);
+                } else if (child_type.is_date_type()) {
+                    DateTimeVal data;
+                    iter.get(&data);
+                    auto datetime_value = DateTimeValue::from_datetime_val(data);
+                    buf_ret = _add_row_value(index, child_type, &datetime_value);
+                } else if (child_type.is_decimal_type()) {
+                    DecimalV2Val data;
+                    iter.get(&data);
+                    auto decimal_value = DecimalV2Value::from_decimal_val(data);
+                    buf_ret = _add_row_value(index, child_type, &decimal_value);
                 } else {
-                    buf_ret = _add_row_value(index, children_type, iter.value());
+                    buf_ret = _add_row_value(index, child_type, iter.get());
                 }
             }
 
diff --git a/be/src/runtime/raw_value.cpp b/be/src/runtime/raw_value.cpp
index eb63f1653b..3699b9ce0b 100644
--- a/be/src/runtime/raw_value.cpp
+++ b/be/src/runtime/raw_value.cpp
@@ -176,11 +176,11 @@ void RawValue::print_value(const void* value, const TypeDescriptor& type, int sc
         auto children_type = type.children.at(0);
         auto iter = src->iterator(children_type.type);
         *stream << "[";
-        print_value(iter.value(), children_type, scale, stream);
+        print_value(iter.get(), children_type, scale, stream);
         iter.next();
         for (; iter.has_next(); iter.next()) {
             *stream << ", ";
-            print_value(iter.value(), children_type, scale, stream);
+            print_value(iter.get(), children_type, scale, stream);
         }
         *stream << "]";
         break;
@@ -333,10 +333,7 @@ void RawValue::write(const void* value, void* dst, const TypeDescriptor& type, M
             val->copy_null_signs(src);
 
             while (src_iter.has_next() && val_iter.has_next()) {
-                if (!src_iter.is_null()) {
-                    // write children
-                    write(src_iter.value(), val_iter.value(), item_type, pool);
-                }
+                val_iter.raw_value_write(src_iter.get(), item_type, pool);
                 src_iter.next();
                 val_iter.next();
             }
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 5acf7634de..232fc23acf 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -173,8 +173,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
 
                 CollectionValue* array_val =
                         tuple->get_collection_slot(slot_collection->tuple_offset());
-                CollectionValue::deserialize_collection(array_val, tuple_data,
-                                                        slot_collection->type());
+                const auto& item_type_desc = slot_collection->type().children[0];
+                CollectionValue::deserialize_collection(array_val, tuple_data, item_type_desc);
             }
         }
     }
@@ -573,10 +573,10 @@ size_t RowBatch::total_byte_size() const {
                 if (tuple->is_null(slot_collection->null_indicator_offset())) {
                     continue;
                 }
-                // compute data null_signs size
                 CollectionValue* array_val =
                         tuple->get_collection_slot(slot_collection->tuple_offset());
-                result += array_val->get_byte_size(slot_collection->type());
+                const auto& item_type_desc = slot_collection->type().children[0];
+                result += array_val->get_byte_size(item_type_desc);
             }
         }
     }
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 15c4c31963..821f9b3eac 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -29,7 +29,7 @@
 namespace doris {
 
 struct StringValue;
-struct CollectionValue;
+class CollectionValue;
 class TupleDescriptor;
 class DateTimeValue;
 class TupleRow;
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index eead93f59e..56e447f375 100644
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -36,7 +36,7 @@ struct StringValue;
 class BitmapValue;
 class DecimalV2Value;
 class DateTimeValue;
-struct CollectionValue;
+class CollectionValue;
 } // namespace doris
 
 namespace doris_udf {
diff --git a/be/src/util/array_parser.h b/be/src/util/array_parser.h
index 249b190810..bfb214a51e 100644
--- a/be/src/util/array_parser.h
+++ b/be/src/util/array_parser.h
@@ -22,8 +22,8 @@
 #include <unordered_map>
 
 #include "common/status.h"
-#include "exprs/anyval_util.h"
 #include "runtime/collection_value.h"
+#include "runtime/large_int_value.h"
 #include "runtime/primitive_type.h"
 #include "runtime/types.h"
 #include "util/mem_util.hpp"
@@ -83,11 +83,11 @@ private:
         auto item_type = child_type_desc.type;
         CollectionValue collection_value;
         CollectionValue::init_collection(context, array.Size(), item_type, &collection_value);
-        int index = 0;
-        for (auto it = array.Begin(); it != array.End(); ++it) {
+        auto iterator = collection_value.iterator(item_type);
+        for (auto it = array.Begin(); it != array.End(); ++it, iterator.next()) {
             if (it->IsNull()) {
                 auto null = AnyVal(true);
-                collection_value.set(index++, item_type, &null);
+                iterator.set(&null);
                 continue;
             } else if (!_is_type_valid<Encoding>(it, item_type)) {
                 return Status::RuntimeError("Failed to parse the json to array.");
@@ -97,7 +97,7 @@ private:
             if (!status.ok()) {
                 return status;
             }
-            collection_value.set(index++, item_type, val);
+            iterator.set(val);
         }
         collection_value.to_collection_val(&array_val);
         return Status::OK();
@@ -115,10 +115,11 @@ private:
         case TYPE_SMALLINT:
         case TYPE_INT:
         case TYPE_BIGINT:
-        case TYPE_LARGEINT:
         case TYPE_FLOAT:
         case TYPE_DOUBLE:
             return iterator->IsNumber();
+        case TYPE_LARGEINT:
+            return iterator->IsNumber() || iterator->IsString();
         case TYPE_DATE:
         case TYPE_DATETIME:
         case TYPE_CHAR:
@@ -130,6 +131,8 @@ private:
             return iterator->IsObject();
         case TYPE_ARRAY:
             return iterator->IsArray();
+        case TYPE_DECIMALV2:
+            return iterator->IsNumber() || iterator->IsString();
         default:
             return false;
         }
@@ -165,6 +168,28 @@ private:
             *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(BigIntVal)));
             new (*val) BigIntVal(iterator->GetInt64());
             break;
+        case TYPE_LARGEINT: {
+            __int128 value = 0;
+            if (iterator->IsNumber()) {
+                value = iterator->GetUint64();
+            } else {
+                std::string_view view(iterator->GetString(), iterator->GetStringLength());
+                std::stringstream stream;
+                stream << view;
+                stream >> value;
+            }
+            *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(LargeIntVal)));
+            new (*val) LargeIntVal(value);
+            break;
+        }
+        case TYPE_FLOAT:
+            *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(FloatVal)));
+            new (*val) FloatVal(iterator->GetFloat());
+            break;
+        case TYPE_DOUBLE:
+            *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(DoubleVal)));
+            new (*val) DoubleVal(iterator->GetDouble());
+            break;
         case TYPE_CHAR:
         case TYPE_VARCHAR:
         case TYPE_STRING: {
@@ -175,6 +200,34 @@ private:
             memory_copy(string_val->ptr, iterator->GetString(), iterator->GetStringLength());
             break;
         }
+        case TYPE_DATE:
+        case TYPE_DATETIME: {
+            DateTimeValue value;
+            value.from_date_str(iterator->GetString(), iterator->GetStringLength());
+            *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(DateTimeVal)));
+            new (*val) DateTimeVal();
+            value.to_datetime_val(static_cast<DateTimeVal*>(*val));
+            break;
+        }
+        case TYPE_DECIMALV2: {
+            *val = reinterpret_cast<AnyVal*>(context->allocate(sizeof(DecimalV2Val)));
+            new (*val) DecimalV2Val();
+
+            if (iterator->IsNumber()) {
+                if (iterator->IsUint64()) {
+                    DecimalV2Value(iterator->GetUint64(), 0)
+                            .to_decimal_val(static_cast<DecimalV2Val*>(*val));
+                } else {
+                    DecimalV2Value value;
+                    value.assign_from_double(iterator->GetDouble());
+                    value.to_decimal_val(static_cast<DecimalV2Val*>(*val));
+                }
+            } else {
+                std::string_view view(iterator->GetString(), iterator->GetStringLength());
+                DecimalV2Value(view).to_decimal_val(static_cast<DecimalV2Val*>(*val));
+            }
+            break;
+        }
         default:
             return Status::RuntimeError("Failed to parse json to type (" +
                                         std::to_string(type_desc.type) + ").");
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 821231684f..9f743f5ca6 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -803,20 +803,38 @@ void Block::deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor
         }
         auto item_column = array_column->get_data_ptr().get();
         auto offset = array_column->get_offsets()[row - 1];
+        auto iterator = collection_value->iterator(item_type_desc.type);
         for (int i = 0; i < collection_value->length(); ++i) {
-            char* item_dst = reinterpret_cast<char*>(collection_value->mutable_data()) +
-                             i * item_type_desc.get_slot_size();
             if (array[i].is_null()) {
                 const auto& null_value = doris_udf::AnyVal(true);
-                collection_value->set(i, item_type_desc.type, &null_value);
+                iterator.set(&null_value);
             } else {
                 auto item_offset = offset + i;
                 const auto& data_ref = item_type_desc.type != TYPE_ARRAY
                                                ? item_column->get_data_at(item_offset)
                                                : StringRef();
-                deep_copy_slot(item_dst, pool, item_type_desc, data_ref, item_column, item_offset,
-                               padding_char);
+                if (item_type_desc.is_date_type()) {
+                    // In CollectionValue, date type data is stored as either uint24_t or uint64_t.
+                    DateTimeValue datetime_value;
+                    deep_copy_slot(&datetime_value, pool, item_type_desc, data_ref, item_column,
+                                   item_offset, padding_char);
+                    DateTimeVal datetime_val;
+                    datetime_value.to_datetime_val(&datetime_val);
+                    iterator.set(&datetime_val);
+                } else if (item_type_desc.is_decimal_type()) {
+                    // In CollectionValue, decimal type data is stored as decimal12_t.
+                    DecimalV2Value decimal_value;
+                    deep_copy_slot(&decimal_value, pool, item_type_desc, data_ref, item_column,
+                                   item_offset, padding_char);
+                    DecimalV2Val decimal_val;
+                    decimal_value.to_decimal_val(&decimal_val);
+                    iterator.set(&decimal_val);
+                } else {
+                    deep_copy_slot(iterator.get(), pool, item_type_desc, data_ref, item_column,
+                                   item_offset, padding_char);
+                }
             }
+            iterator.next();
         }
     } else if (type_desc.is_date_type()) {
         VecDateTimeValue ts =
diff --git a/be/src/vec/olap/olap_data_convertor.cpp b/be/src/vec/olap/olap_data_convertor.cpp
index 10fe57174a..3c03cf5111 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -22,89 +22,75 @@
 
 namespace doris::vectorized {
 
-// class OlapBlockDataConvertor
 OlapBlockDataConvertor::OlapBlockDataConvertor(const TabletSchema* tablet_schema) {
     assert(tablet_schema);
     const auto& columns = tablet_schema->columns();
     for (const auto& col : columns) {
-        switch (col.type()) {
-        case FieldType::OLAP_FIELD_TYPE_OBJECT: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorBitMap>());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_HLL: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorHLL>());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_CHAR: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorChar>(col.length()));
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_MAP:
-        case FieldType::OLAP_FIELD_TYPE_VARCHAR: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorVarChar>(false));
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_STRING: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorVarChar>(true));
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_DATE: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorDate>());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_DATETIME: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorDateTime>());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
-            _convertors.emplace_back(std::make_shared<OlapColumnDataConvertorDecimal>());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_BOOL: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::UInt8> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_TINYINT: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Int8> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_SMALLINT: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Int16> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_INT: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Int32> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_BIGINT: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Int64> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_LARGEINT: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Int128> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_FLOAT: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Float32> >());
-            break;
-        }
-        case FieldType::OLAP_FIELD_TYPE_DOUBLE: {
-            _convertors.emplace_back(
-                    std::make_shared<OlapColumnDataConvertorSimple<vectorized::Float64> >());
-            break;
-        }
-        default: {
-            DCHECK(false) << "Invalid type in RowBlockV2:" << col.type();
-        }
-        }
+        _convertors.emplace_back(create_olap_column_data_convertor(col));
+    }
+}
+
+OlapBlockDataConvertor::OlapColumnDataConvertorBaseUPtr
+OlapBlockDataConvertor::create_olap_column_data_convertor(const TabletColumn& column) {
+    switch (column.type()) {
+    case FieldType::OLAP_FIELD_TYPE_OBJECT: {
+        return std::make_unique<OlapColumnDataConvertorBitMap>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_HLL: {
+        return std::make_unique<OlapColumnDataConvertorHLL>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_CHAR: {
+        return std::make_unique<OlapColumnDataConvertorChar>(column.length());
+    }
+    case FieldType::OLAP_FIELD_TYPE_MAP:
+    case FieldType::OLAP_FIELD_TYPE_VARCHAR: {
+        return std::make_unique<OlapColumnDataConvertorVarChar>(false);
+    }
+    case FieldType::OLAP_FIELD_TYPE_STRING: {
+        return std::make_unique<OlapColumnDataConvertorVarChar>(true);
+    }
+    case FieldType::OLAP_FIELD_TYPE_DATE: {
+        return std::make_unique<OlapColumnDataConvertorDate>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_DATETIME: {
+        return std::make_unique<OlapColumnDataConvertorDateTime>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_DECIMAL: {
+        return std::make_unique<OlapColumnDataConvertorDecimal>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_BOOL: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::UInt8>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_TINYINT: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int8>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_SMALLINT: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int16>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_INT: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int32>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_BIGINT: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int64>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_LARGEINT: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Int128>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_FLOAT: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Float32>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_DOUBLE: {
+        return std::make_unique<OlapColumnDataConvertorSimple<vectorized::Float64>>();
+    }
+    case FieldType::OLAP_FIELD_TYPE_ARRAY: {
+        const auto& sub_column = column.get_sub_column(0);
+        return std::make_unique<OlapColumnDataConvertorArray>(
+                create_olap_column_data_convertor(sub_column));
+    }
+    default: {
+        DCHECK(false) << "Invalid type in RowBlockV2:" << column.type();
+        return nullptr;
+    }
     }
 }
 
@@ -125,11 +111,11 @@ void OlapBlockDataConvertor::clear_source_content() {
     }
 }
 
-std::pair<Status, IOlapColumnDataAccessorSPtr> OlapBlockDataConvertor::convert_column_data(
+std::pair<Status, IOlapColumnDataAccessor*> OlapBlockDataConvertor::convert_column_data(
         size_t cid) {
     assert(cid < _convertors.size());
     auto status = _convertors[cid]->convert_to_olap();
-    return {status, _convertors[cid]};
+    return {status, _convertors[cid].get()};
 }
 
 // class OlapBlockDataConvertor::OlapColumnDataConvertorBase
@@ -504,27 +490,6 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorVarChar::convert_to_olap()
     return Status::OK();
 }
 
-// class OlapBlockDataConvertor::OlapColumnDataConvertorDate
-void OlapBlockDataConvertor::OlapColumnDataConvertorDate::set_source_column(
-        const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
-    OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos,
-                                                                           num_rows);
-    _values.resize(num_rows);
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorDate::get_data() const {
-    return _values.data();
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorDate::get_data_at(size_t offset) const {
-    assert(offset < _num_rows && _num_rows == _values.size());
-    UInt8 null_flag = 0;
-    if (_nullmap) {
-        null_flag = _nullmap[offset];
-    }
-    return null_flag ? nullptr : _values.data() + offset;
-}
-
 Status OlapBlockDataConvertor::OlapColumnDataConvertorDate::convert_to_olap() {
     assert(_typed_column.column);
     const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
@@ -568,28 +533,6 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorDate::convert_to_olap() {
     return Status::OK();
 }
 
-// class OlapBlockDataConvertor::OlapColumnDataConvertorDateTime
-void OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::set_source_column(
-        const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
-    OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos,
-                                                                           num_rows);
-    _values.resize(num_rows);
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::get_data() const {
-    return _values.data();
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::get_data_at(
-        size_t offset) const {
-    assert(offset < _num_rows && _num_rows == _values.size());
-    UInt8 null_flag = 0;
-    if (_nullmap) {
-        null_flag = _nullmap[offset];
-    }
-    return null_flag ? nullptr : _values.data() + offset;
-}
-
 Status OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::convert_to_olap() {
     assert(_typed_column.column);
     const vectorized::ColumnVector<vectorized::Int64>* column_datetime = nullptr;
@@ -633,28 +576,6 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorDateTime::convert_to_olap(
     return Status::OK();
 }
 
-// class OlapBlockDataConvertor::OlapColumnDataConvertorDecimal
-void OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::set_source_column(
-        const ColumnWithTypeAndName& typed_column, size_t row_pos, size_t num_rows) {
-    OlapBlockDataConvertor::OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos,
-                                                                           num_rows);
-    _values.resize(num_rows);
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::get_data() const {
-    return _values.data();
-}
-
-const void* OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::get_data_at(
-        size_t offset) const {
-    assert(offset < _num_rows && _num_rows == _values.size());
-    UInt8 null_flag = 0;
-    if (_nullmap) {
-        null_flag = _nullmap[offset];
-    }
-    return null_flag ? nullptr : _values.data() + offset;
-}
-
 Status OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::convert_to_olap() {
     assert(_typed_column.column);
     const vectorized::ColumnDecimal<vectorized::Decimal128>* column_decimal = nullptr;
@@ -700,4 +621,55 @@ Status OlapBlockDataConvertor::OlapColumnDataConvertorDecimal::convert_to_olap()
     return Status::OK();
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+Status OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap() {
+    const ColumnArray* column_array = nullptr;
+    const DataTypeArray* data_type_ptr_array = nullptr;
+    if (_nullmap) {
+        const auto* nullable_column =
+                assert_cast<const ColumnNullable*>(_typed_column.column.get());
+        column_array =
+                assert_cast<const ColumnArray*>(nullable_column->get_nested_column_ptr().get());
+        data_type_ptr_array = assert_cast<const DataTypeArray*>(
+                (assert_cast<const DataTypeNullable*>(_typed_column.type.get())->get_nested_type())
+                        .get());
+    } else {
+        column_array = assert_cast<const ColumnArray*>(_typed_column.column.get());
+        data_type_ptr_array = assert_cast<const DataTypeArray*>(_typed_column.type.get());
+    }
+    assert(column_array);
+    assert(data_type_ptr_array);
+
+    const auto& offsets = column_array->get_offsets();
+    CollectionValue* collection_value = _values.data();
+    for (int i = 0; i < _num_rows; ++i, ++collection_value) {
+        int64_t cur_pos = _row_pos + i;
+        int64_t prev_pos = cur_pos - 1;
+        if (_nullmap && _nullmap[cur_pos]) {
+            continue;
+        }
+        auto offset = offsets[prev_pos];
+        auto size = offsets[cur_pos] - offsets[prev_pos];
+        new (collection_value) CollectionValue(size);
+
+        if (size == 0) {
+            continue;
+        }
+
+        if (column_array->get_data().is_nullable()) {
+            const auto& data_nullable_column =
+                    assert_cast<const ColumnNullable&>(column_array->get_data());
+            const auto* data_null_map = data_nullable_column.get_null_map_data().data();
+            collection_value->set_has_null(true);
+            collection_value->set_null_signs(
+                    const_cast<bool*>(reinterpret_cast<const bool*>(data_null_map + offset)));
+        }
+        ColumnWithTypeAndName item_typed_column = {column_array->get_data_ptr(),
+                                                   data_type_ptr_array->get_nested_type(), ""};
+        _item_convertor->set_source_column(item_typed_column, offset, size);
+        _item_convertor->convert_to_olap();
+        collection_value->set_data(const_cast<void*>(_item_convertor->get_data()));
+    }
+    return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/olap/olap_data_convertor.h b/be/src/vec/olap/olap_data_convertor.h
index 71d08b4ae9..eb104b1414 100644
--- a/be/src/vec/olap/olap_data_convertor.h
+++ b/be/src/vec/olap/olap_data_convertor.h
@@ -26,23 +26,27 @@ public:
     virtual const UInt8* get_nullmap() const = 0;
     virtual const void* get_data() const = 0;
     virtual const void* get_data_at(size_t offset) const = 0;
-    virtual ~IOlapColumnDataAccessor() {}
+    virtual ~IOlapColumnDataAccessor() = default;
 };
-using IOlapColumnDataAccessorSPtr = std::shared_ptr<IOlapColumnDataAccessor>;
 
 class OlapBlockDataConvertor {
 public:
     OlapBlockDataConvertor(const TabletSchema* tablet_schema);
     void set_source_content(const vectorized::Block* block, size_t row_pos, size_t num_rows);
     void clear_source_content();
-    std::pair<Status, IOlapColumnDataAccessorSPtr> convert_column_data(size_t cid);
+    std::pair<Status, IOlapColumnDataAccessor*> convert_column_data(size_t cid);
 
 private:
+    class OlapColumnDataConvertorBase;
+    using OlapColumnDataConvertorBaseUPtr = std::unique_ptr<OlapColumnDataConvertorBase>;
+    using OlapColumnDataConvertorBaseSPtr = std::shared_ptr<OlapColumnDataConvertorBase>;
+
+    OlapColumnDataConvertorBaseUPtr create_olap_column_data_convertor(const TabletColumn& column);
+
     // accessors for different data types;
     class OlapColumnDataConvertorBase : public IOlapColumnDataAccessor {
     public:
         OlapColumnDataConvertorBase() = default;
-        virtual ~OlapColumnDataConvertorBase() = default;
         OlapColumnDataConvertorBase(const OlapColumnDataConvertorBase&) = delete;
         OlapColumnDataConvertorBase& operator=(const OlapColumnDataConvertorBase&) = delete;
         OlapColumnDataConvertorBase(OlapColumnDataConvertorBase&&) = delete;
@@ -60,13 +64,9 @@ private:
         size_t _num_rows = 0;
         const UInt8* _nullmap = nullptr;
     };
-    using OlapColumnDataConvertorBaseSPtr = std::shared_ptr<OlapColumnDataConvertorBase>;
 
     class OlapColumnDataConvertorObject : public OlapColumnDataConvertorBase {
     public:
-        OlapColumnDataConvertorObject() = default;
-        ~OlapColumnDataConvertorObject() override = default;
-
         void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
                                size_t num_rows) override;
         const void* get_data() const override;
@@ -119,49 +119,42 @@ private:
         PaddedPODArray<Slice> _slice;
     };
 
-    class OlapColumnDataConvertorDate : public OlapColumnDataConvertorBase {
+    template <typename T>
+    class OlapColumnDataConvertorPaddedPODArray : public OlapColumnDataConvertorBase {
     public:
-        OlapColumnDataConvertorDate() = default;
-        ~OlapColumnDataConvertorDate() override = default;
-
         void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
-                               size_t num_rows) override;
-        const void* get_data() const override;
-        const void* get_data_at(size_t offset) const override;
-        Status convert_to_olap() override;
+                               size_t num_rows) override {
+            OlapColumnDataConvertorBase::set_source_column(typed_column, row_pos, num_rows);
+            _values.resize(num_rows);
+        }
+        const void* get_data() const override { return _values.data(); }
+        const void* get_data_at(size_t offset) const override {
+            assert(offset < _num_rows && _num_rows == _values.size());
+            UInt8 null_flag = 0;
+            if (_nullmap) {
+                null_flag = _nullmap[offset];
+            }
+            return null_flag ? nullptr : _values.data() + offset;
+        }
 
-    private:
-        PaddedPODArray<uint24_t> _values;
+    protected:
+        PaddedPODArray<T> _values;
     };
 
-    class OlapColumnDataConvertorDateTime : public OlapColumnDataConvertorBase {
+    class OlapColumnDataConvertorDate : public OlapColumnDataConvertorPaddedPODArray<uint24_t> {
     public:
-        OlapColumnDataConvertorDateTime() = default;
-        ~OlapColumnDataConvertorDateTime() override = default;
-
-        void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
-                               size_t num_rows) override;
-        const void* get_data() const override;
-        const void* get_data_at(size_t offset) const override;
         Status convert_to_olap() override;
-
-    private:
-        PaddedPODArray<uint64_t> _values;
     };
 
-    class OlapColumnDataConvertorDecimal : public OlapColumnDataConvertorBase {
+    class OlapColumnDataConvertorDateTime : public OlapColumnDataConvertorPaddedPODArray<uint64_t> {
     public:
-        OlapColumnDataConvertorDecimal() = default;
-        ~OlapColumnDataConvertorDecimal() override = default;
-
-        void set_source_column(const ColumnWithTypeAndName& typed_column, size_t row_pos,
-                               size_t num_rows) override;
-        const void* get_data() const override;
-        const void* get_data_at(size_t offset) const override;
         Status convert_to_olap() override;
+    };
 
-    private:
-        PaddedPODArray<decimal12_t> _values;
+    class OlapColumnDataConvertorDecimal
+            : public OlapColumnDataConvertorPaddedPODArray<decimal12_t> {
+    public:
+        Status convert_to_olap() override;
     };
 
     // class OlapColumnDataConvertorSimple for simple types, which don't need to do any convert, like int, float, double, etc...
@@ -203,8 +196,20 @@ private:
         const T* _values = nullptr;
     };
 
+    class OlapColumnDataConvertorArray
+            : public OlapColumnDataConvertorPaddedPODArray<CollectionValue> {
+    public:
+        OlapColumnDataConvertorArray(OlapColumnDataConvertorBaseUPtr item_convertor)
+                : _item_convertor(std::move(item_convertor)) {}
+
+        Status convert_to_olap() override;
+
+    private:
+        OlapColumnDataConvertorBaseUPtr _item_convertor;
+    };
+
 private:
-    std::vector<OlapColumnDataConvertorBaseSPtr> _convertors;
+    std::vector<OlapColumnDataConvertorBaseUPtr> _convertors;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/mysql_result_writer.cpp b/be/src/vec/sink/mysql_result_writer.cpp
index d7230ec519..7b45c713a2 100644
--- a/be/src/vec/sink/mysql_result_writer.cpp
+++ b/be/src/vec/sink/mysql_result_writer.cpp
@@ -22,6 +22,7 @@
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
+#include "vec/core/types.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 #include "vec/runtime/vdatetime_value.h"
@@ -270,6 +271,17 @@ int VMysqlResultWriter::_add_one_cell(const ColumnPtr& column_ptr, size_t row_id
             buf_ret = buffer.push_string(string_val.data, string_val.size);
         }
         return buf_ret;
+    } else if (which.is_date_or_datetime()) {
+        auto& column_vector = assert_cast<const ColumnVector<Int64>&>(*column);
+        auto value = column_vector[row_idx].get<Int64>();
+        VecDateTimeValue datetime;
+        memcpy(static_cast<void*>(&datetime), static_cast<void*>(&value), sizeof(value));
+        if (which.is_date()) {
+            datetime.cast_to_date();
+        }
+        char buf[64];
+        char* pos = datetime.to_string(buf);
+        return buffer.push_string(buf, pos - buf - 1);
     } else if (which.is_array()) {
         auto& column_array = assert_cast<const ColumnArray&>(*column);
         auto& offsets = column_array.get_offsets();
diff --git a/be/test/exprs/array_functions_test.cpp b/be/test/exprs/array_functions_test.cpp
index 2c07dd9b62..a0fe04a0ef 100644
--- a/be/test/exprs/array_functions_test.cpp
+++ b/be/test/exprs/array_functions_test.cpp
@@ -68,7 +68,7 @@ TEST_F(ArrayFunctionsTest, array) {
         for (auto&& iter = value.iterator(TYPE_INT); iter.has_next(); iter.next()) {
             i++;
             IntVal a;
-            iter.value(&a);
+            iter.get(&a);
             EXPECT_EQ(i, a.val);
         }
     }
diff --git a/be/test/runtime/array_test.cpp b/be/test/runtime/array_test.cpp
index f1e335febc..2582a975f5 100644
--- a/be/test/runtime/array_test.cpp
+++ b/be/test/runtime/array_test.cpp
@@ -25,7 +25,6 @@
 #include <string>
 #include <vector>
 
-#include "exprs/anyval_util.h"
 #include "gen_cpp/olap_file.pb.h"
 #include "gen_cpp/segment_v2.pb.h"
 #include "olap/field.h"
@@ -36,9 +35,11 @@
 #include "olap/rowset/segment_v2/column_writer.h"
 #include "olap/tablet_schema.h"
 #include "olap/types.h"
+#include "runtime/collection_value.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
+#include "runtime/primitive_type.h"
 #include "runtime/raw_value.h"
 #include "testutil/array_utils.h"
 #include "testutil/desc_tbl_builder.h"
@@ -78,16 +79,40 @@ std::unique_ptr<Field> create_field(const ColumnPB& column_pb) {
 
 TypeDescriptor get_scalar_type_desc(const TypeInfo* type_info) {
     switch (type_info->type()) {
+    case OLAP_FIELD_TYPE_BOOL:
+        return TypeDescriptor(TYPE_BOOLEAN);
+    case OLAP_FIELD_TYPE_TINYINT:
+        return TypeDescriptor(TYPE_TINYINT);
+    case OLAP_FIELD_TYPE_SMALLINT:
+        return TypeDescriptor(TYPE_SMALLINT);
     case OLAP_FIELD_TYPE_INT:
         return TypeDescriptor(TYPE_INT);
+    case OLAP_FIELD_TYPE_BIGINT:
+        return TypeDescriptor(TYPE_BIGINT);
+    case OLAP_FIELD_TYPE_LARGEINT:
+        return TypeDescriptor(TYPE_LARGEINT);
+    case OLAP_FIELD_TYPE_FLOAT:
+        return TypeDescriptor(TYPE_FLOAT);
+    case OLAP_FIELD_TYPE_DOUBLE:
+        return TypeDescriptor(TYPE_DOUBLE);
+    case OLAP_FIELD_TYPE_CHAR:
+        return TypeDescriptor::create_char_type(TypeDescriptor::MAX_CHAR_LENGTH);
     case OLAP_FIELD_TYPE_VARCHAR:
         return TypeDescriptor::create_varchar_type(TypeDescriptor::MAX_VARCHAR_LENGTH);
+    case OLAP_FIELD_TYPE_STRING:
+        return TypeDescriptor::create_string_type();
+    case OLAP_FIELD_TYPE_DATE:
+        return TypeDescriptor(TYPE_DATE);
+    case OLAP_FIELD_TYPE_DATETIME:
+        return TypeDescriptor(TYPE_DATETIME);
+    case OLAP_FIELD_TYPE_DECIMAL:
+        return TypeDescriptor(TYPE_DECIMALV2);
     default:
-        return TypeDescriptor();
+        DCHECK(false) << "Failed to get the scalar type descriptor.";
     }
 }
 
-TupleDescriptor* get_tuple_descriptor(ObjectPool& object_pool, const TypeInfo* type_info) {
+const TupleDescriptor* get_tuple_descriptor(ObjectPool& object_pool, const TypeInfo* type_info) {
     DescriptorTblBuilder builder(&object_pool);
     auto& tuple_desc_builder = builder.declare_tuple();
     if (type_info->type() == OLAP_FIELD_TYPE_ARRAY) {
@@ -127,6 +152,28 @@ public:
             : _mem_tracker(new MemTracker(MAX_MEMORY_BYTES, "ArrayTest")),
               _mem_pool(new MemPool(_mem_tracker.get())) {}
 
+    template <segment_v2::EncodingTypePB array_encoding, segment_v2::EncodingTypePB item_encoding>
+    void test(const ColumnPB& column_pb, const std::vector<std::string>& literal_arrays) {
+        auto field = create_field(column_pb);
+        const auto* type_info = field->type_info();
+        const auto* tuple_desc = get_tuple_descriptor(_object_pool, type_info);
+        EXPECT_EQ(tuple_desc->slots().size(), 1);
+
+        FunctionContext context;
+        ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
+
+        std::vector<const CollectionValue*> arrays;
+        for (const auto& literal_array : literal_arrays) {
+            arrays.push_back(parse(*_mem_pool, context, literal_array, column_pb));
+        }
+
+        for (auto array : arrays) {
+            test_array<array_encoding, item_encoding>(column_pb, field.get(), tuple_desc, array);
+        }
+        test_direct_copy_array(field.get(), arrays);
+        test_write_and_read_column<array_encoding, item_encoding>(column_pb, field.get(), arrays);
+    }
+
 protected:
     void SetUp() override {
         if (FileUtils::check_exist(TEST_DIR)) {
@@ -145,8 +192,8 @@ private:
     void test_copy_array(const TupleDescriptor* tuple_desc, const Field* field,
                          const CollectionValue* array) {
         auto slot_desc = tuple_desc->slots().front();
-        auto type_desc = slot_desc->type();
-        auto total_size = tuple_desc->byte_size() + array->get_byte_size(type_desc);
+        const auto& item_type_desc = slot_desc->type().children[0];
+        auto total_size = tuple_desc->byte_size() + array->get_byte_size(item_type_desc);
 
         auto src = allocate_tuple(total_size);
         EXPECT_NE(src, nullptr);
@@ -169,7 +216,8 @@ private:
         EXPECT_EQ(total_size, offset);
         EXPECT_EQ(total_size, serialized_data - reinterpret_cast<char*>(dst));
         dst_cv = reinterpret_cast<CollectionValue*>(dst->get_slot(slot_desc->tuple_offset()));
-        CollectionValue::deserialize_collection(dst_cv, reinterpret_cast<char*>(dst), type_desc);
+        CollectionValue::deserialize_collection(dst_cv, reinterpret_cast<char*>(dst),
+                                                item_type_desc);
         validate(field, src_cv, dst_cv);
     }
 
@@ -381,138 +429,261 @@ private:
 
 const std::string ArrayTest::TEST_DIR = "./ut_dir/array_test";
 
-TEST_F(ArrayTest, TestSimpleIntArrays) {
-    auto column_pb = create_column_pb("ARRAY", "INT");
-    auto type_info = get_type_info(column_pb);
-    auto field = create_field(column_pb);
-    auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
-    EXPECT_EQ(tuple_desc->slots().size(), 1);
-    FunctionContext context;
-    ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
-
-    std::vector<const CollectionValue*> arrays = {
-            parse(*_mem_pool, context, "[]", column_pb),
-            parse(*_mem_pool, context, "[null]", column_pb),
-            parse(*_mem_pool, context, "[1, 2, 3]", column_pb),
-            parse(*_mem_pool, context, "[1, null, 3]", column_pb),
-            parse(*_mem_pool, context, "[1, null, null]", column_pb),
-            parse(*_mem_pool, context, "[null, null, 3]", column_pb),
-            parse(*_mem_pool, context, "[null, null, null]", column_pb),
+TEST_F(ArrayTest, TestBoolean) {
+    // depth 1
+    auto column_pb = create_column_pb("ARRAY", "BOOLEAN");
+    std::vector<std::string> literal_arrays = {
+            "[]",
+            "[null]",
+            "[true, false, false]",
+            "[true, null, false]",
+            "[false, null, null]",
+            "[null, null, true]",
+            "[null, null, null]",
     };
-    for (auto array : arrays) {
-        test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
-                                                                          tuple_desc, array);
-    }
-    test_direct_copy_array(field.get(), arrays);
-    test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(
-            column_pb, field.get(), arrays);
+    test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, literal_arrays);
+
+    // depth 2
+    column_pb = create_column_pb("ARRAY", "ARRAY", "BOOLEAN");
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[false, true, false], [true, false, true]]",
+            "[[false, true, false], null, [true, false, true]]",
+            "[[false, true, null], null, [true, null, false], null, [null, false, false]]",
+    };
+    test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, literal_arrays);
+
+    // depth 3
+    column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "BOOLEAN");
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[[]]]",
+            "[[[null]], [[false], [true, false]], [[false, true, false], null, null]]",
+    };
+    test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, literal_arrays);
 }
 
-TEST_F(ArrayTest, TestNestedIntArrays) {
+void test_integer(const std::string& type, ArrayTest& test_suite) {
+    // depth 1
+    auto column_pb = create_column_pb("ARRAY", type);
+    std::vector<std::string> literal_arrays = {
+            "[]",
+            "[null]",
+            "[1, 2, 3]",
+            "[1, null, 3]",
+            "[1, null, null]",
+            "[null, null, 3]",
+            "[null, null, null]",
+    };
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
+
     // depth 2
-    auto column_pb = create_column_pb("ARRAY", "ARRAY", "INT");
-    auto type_info = get_type_info(column_pb);
-    auto field = create_field(column_pb);
-    auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
-    EXPECT_EQ(tuple_desc->slots().size(), 1);
-    auto context = std::make_unique<FunctionContext>();
-    ArrayUtils::prepare_context(*context, *_mem_pool, column_pb);
-
-    std::vector<const CollectionValue*> arrays = {
-            parse(*_mem_pool, *context, "[]", column_pb),
-            parse(*_mem_pool, *context, "[[]]", column_pb),
-            parse(*_mem_pool, *context, "[[1, 2, 3], [4, 5, 6]]", column_pb),
-            parse(*_mem_pool, *context, "[[1, 2, 3], null, [4, 5, 6]]", column_pb),
-            parse(*_mem_pool, *context, "[[1, 2, null], null, [4, null, 6], null, [null, 8, 9]]",
-                  column_pb),
+    column_pb = create_column_pb("ARRAY", "ARRAY", type);
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[1, 2, 3], [4, 5, 6]]",
+            "[[1, 2, 3], null, [4, 5, 6]]",
+            "[[1, 2, null], null, [4, null, 6], null, [null, 8, 9]]",
     };
-    for (auto array : arrays) {
-        test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
-                                                                          tuple_desc, array);
-    }
-    test_direct_copy_array(field.get(), arrays);
-    test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(
-            column_pb, field.get(), arrays);
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
 
     // depth 3
-    column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "INT");
-    type_info = get_type_info(column_pb);
-    field = create_field(column_pb);
-    tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
-    EXPECT_EQ(tuple_desc->slots().size(), 1);
-    arrays.clear();
-    EXPECT_EQ(arrays.size(), 0);
-    context.reset(new FunctionContext);
-    ArrayUtils::prepare_context(*context, *_mem_pool, column_pb);
-
-    arrays = {
-            parse(*_mem_pool, *context, "[]", column_pb),
-            parse(*_mem_pool, *context, "[[]]", column_pb),
-            parse(*_mem_pool, *context, "[[[]]]", column_pb),
-            parse(*_mem_pool, *context, "[[[null]], [[1], [2, 3]], [[4, 5, 6], null, null]]",
-                  column_pb),
+    column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", type);
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[[]]]",
+            "[[[null]], [[1], [2, 3]], [[4, 5, 6], null, null]]",
     };
-    for (auto array : arrays) {
-        test_array<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb, field.get(),
-                                                                          tuple_desc, array);
-    }
-    test_direct_copy_array(field.get(), arrays);
-    test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(
-            column_pb, field.get(), arrays);
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
 }
 
-TEST_F(ArrayTest, TestSimpleStringArrays) {
-    auto column_pb = create_column_pb("ARRAY", "VARCHAR");
-    auto type_info = get_type_info(column_pb);
-    auto field = create_field(column_pb);
-    auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
-    EXPECT_EQ(tuple_desc->slots().size(), 1);
-    FunctionContext context;
-    ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
-
-    std::vector<const CollectionValue*> arrays = {
-            parse(*_mem_pool, context, "[]", column_pb),
-            parse(*_mem_pool, context, "[null]", column_pb),
-            parse(*_mem_pool, context, "[\"a\", \"b\", \"c\"]", column_pb),
-            parse(*_mem_pool, context, "[null, \"b\", \"c\"]", column_pb),
-            parse(*_mem_pool, context, "[\"a\", null, \"c\"]", column_pb),
-            parse(*_mem_pool, context, "[\"a\", \"b\", null]", column_pb),
-            parse(*_mem_pool, context, "[null, \"b\", null]", column_pb),
-            parse(*_mem_pool, context, "[null, null, null]", column_pb),
+TEST_F(ArrayTest, TestInteger) {
+    test_integer("TINYINT", *this);
+    test_integer("SMALLINT", *this);
+    test_integer("INT", *this);
+    test_integer("BIGINT", *this);
+    test_integer("LARGEINT", *this);
+}
+
+void test_float(const std::string& type, ArrayTest& test_suite) {
+    // depth 1
+    auto column_pb = create_column_pb("ARRAY", type);
+    std::vector<std::string> literal_arrays = {
+            "[]",
+            "[null]",
+            "[1.5, 2.5, 3.5]",
+            "[1.5, null, 3.5]",
+            "[1.5, null, null]",
+            "[null, null, 3.5]",
+            "[null, null, null]",
     };
-    for (auto array : arrays) {
-        test_array<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb, field.get(),
-                                                                            tuple_desc, array);
-    }
-    test_direct_copy_array(field.get(), arrays);
-    test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(
-            column_pb, field.get(), arrays);
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
+    // depth 2
+    column_pb = create_column_pb("ARRAY", "ARRAY", type);
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[1.5, 2.5, 3.5], [4.5, 5.5, 6.5]]",
+            "[[1.5, 2.5, 3.5], null, [4.5, 5.5, 6.5]]",
+            "[[1.5, 2.5, null], null, [4.5, null, 6.5], null, [null, 8.5, 9.5]]",
+    };
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
+
+    // depth 3
+    column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", type);
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[[]]]",
+            "[[[null]], [[1.5], [2.5, 3.5]], [[4.5, 5.5, 6.5], null, null]]",
+    };
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
 }
 
-TEST_F(ArrayTest, TestNestedStringArrays) {
-    auto column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", "VARCHAR");
-    auto type_info = get_type_info(column_pb);
-    auto field = create_field(column_pb);
-    auto tuple_desc = get_tuple_descriptor(_object_pool, type_info.get());
-    EXPECT_EQ(tuple_desc->slots().size(), 1);
-    FunctionContext context;
-    ArrayUtils::prepare_context(context, *_mem_pool, column_pb);
-
-    std::vector<const CollectionValue*> arrays = {
-            parse(*_mem_pool, context, "[]", column_pb),
-            parse(*_mem_pool, context, "[[]]", column_pb),
-            parse(*_mem_pool, context, "[[[]]]", column_pb),
-            parse(*_mem_pool, context, "[null, [null], [[null]]]", column_pb),
-            parse(*_mem_pool, context,
-                  "[[[\"a\", null, \"c\"], [\"d\", \"e\", \"f\"]], null, [[\"g\"]]]", column_pb),
+TEST_F(ArrayTest, TestFloat) {
+    test_float("FLOAT", *this);
+    test_float("DOUBLE", *this);
+}
+
+void test_string(const std::string& type, ArrayTest& test_suite) {
+    // depth 1
+    auto column_pb = create_column_pb("ARRAY", type);
+    std::vector<std::string> literal_arrays = {
+            "[]",
+            "[null]",
+            "[\"a\", \"b\", \"c\"]",
+            "[null, \"b\", \"c\"]",
+            "[\"a\", null, \"c\"]",
+            "[\"a\", \"b\", null]",
+            "[null, \"b\", null]",
+            "[null, null, null]",
+    };
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb,
+                                                                             literal_arrays);
+
+    // more depths
+    column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", type);
+    literal_arrays = {
+            "[]",
+            "[[]]",
+            "[[[]]]",
+            "[null, [null], [[null]]]",
+            "[[[\"a\", null, \"c\"], [\"d\", \"e\", \"f\"]], null, [[\"g\"]]]",
     };
-    for (auto array : arrays) {
-        test_array<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb, field.get(),
-                                                                            tuple_desc, array);
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(column_pb,
+                                                                             literal_arrays);
+}
+
+TEST_F(ArrayTest, TestString) {
+    test_string("CHAR", *this);
+    test_string("VARCHAR", *this);
+    test_string("STRING", *this);
+}
+
+void test_datetime(const std::string& type, ArrayTest& test_suite) {
+    auto column_pb = create_column_pb("ARRAY", type);
+    std::vector<std::string> literal_arrays;
+    if (type == "DATE") {
+        literal_arrays = {
+                "[]",
+                "[null]",
+                "[\"2022-04-01\", \"2022-04-02\", \"2022-04-03\"]",
+                "[\"2022-04-01\", null, \"2022-04-03\"]",
+                "[\"2022-04-01\", null, null]",
+                "[null, null, \"2022-04-03\"]",
+                "[null, null, null]",
+        };
+    } else {
+        literal_arrays = {
+                "[]",
+                "[null]",
+                "[\"2022-04-01 19:30:40\", \"2022-04-02 19:30:40 \", \"2022-04-03 19:30:40\"]",
+                "[\"2022-04-01 19:30:40\", null, \"2022-04-03 19:30:40\"]",
+                "[\"2022-04-01 19:30:40\", null, null]",
+                "[null, null, \"2022-04-03 19:30:40\"]",
+                "[null, null, null]",
+        };
     }
-    test_direct_copy_array(field.get(), arrays);
-    test_write_and_read_column<segment_v2::DEFAULT_ENCODING, segment_v2::DICT_ENCODING>(
-            column_pb, field.get(), arrays);
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
+    // depth 2
+    column_pb = create_column_pb("ARRAY", "ARRAY", type);
+    if (type == "DATE") {
+        literal_arrays = {
+                "[]",
+                "[[]]",
+                "[[\"2022-04-01\", \"2022-04-02\", \"2022-04-03\"], [\"2022-04-04\", "
+                "\"2022-04-05\", "
+                "\"2022-04-06\"]]",
+                "[[\"2022-04-01\", \"2022-04-02\", \"2022-04-03\"], null, [\"2022-04-04\", "
+                "\"2022-04-05\", \"2022-04-06\"]]",
+                "[[\"2022-04-01\", \"2022-04-02\", null], null, [\"2022-04-04\", null, "
+                "\"2022-04-06\"], null, [null, \"2022-04-08\", \"2022-04-09\"]]",
+        };
+    } else {
+        literal_arrays = {
+                "[]",
+                "[[]]",
+                "[[\"2022-04-01 19:30:40\", \"2022-04-02 19:30:40\", \"2022-04-03 19:30:40\"], "
+                "[\"2022-04-04 19:30:40\", "
+                "\"2022-04-05\", "
+                "\"2022-04-06\"]]",
+                "[[\"2022-04-01 19:30:40\", \"2022-04-02 19:30:40\", \"2022-04-03 19:30:40\"], "
+                "null, [\"2022-04-04 19:30:40\", "
+                "\"2022-04-05\", \"2022-04-06\"]]",
+                "[[\"2022-04-01 19:30:40\", \"2022-04-02 19:30:40\", null], null, [\"2022-04-04 "
+                "19:30:40\", null, "
+                "\"2022-04-06 19:30:40\"], null, [null, \"2022-04-08 19:30:40\", \"2022-04-09 "
+                "19:30:40\"]]",
+        };
+    }
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
+
+    // depth 3
+    column_pb = create_column_pb("ARRAY", "ARRAY", "ARRAY", type);
+    if (type == "DATE") {
+        literal_arrays = {
+                "[]",
+                "[[]]",
+                "[[[]]]",
+                "[[[null]], [[\"2022-04-01\"], [\"2022-04-02\", \"2022-04-03\"]], "
+                "[[\"2022-04-04\", "
+                "\"2022-04-05\", \"2022-04-06\"], null, null]]",
+        };
+    } else {
+        literal_arrays = {
+                "[]",
+                "[[]]",
+                "[[[]]]",
+                "[[[null]], [[\"2022-04-01 19:30:40\"], [\"2022-04-02 19:30:40\", \"2022-04-03 "
+                "19:30:40\"]], "
+                "[[\"2022-04-04 19:30:40\", "
+                "\"2022-04-05 19:30:40\", \"2022-04-06 19:30:40\"], null, null]]",
+        };
+    }
+    test_suite.test<segment_v2::DEFAULT_ENCODING, segment_v2::BIT_SHUFFLE>(column_pb,
+                                                                           literal_arrays);
+}
+
+TEST_F(ArrayTest, TestDateTime) {
+    test_datetime("DATE", *this);
+    test_datetime("DATETIME", *this);
+}
+
+TEST_F(ArrayTest, TestDecimal) {
+    test_integer("DECIMAL", *this);
+    test_float("DECIMAL", *this);
 }
 
 } // namespace doris
diff --git a/be/test/runtime/collection_value_test.cpp b/be/test/runtime/collection_value_test.cpp
index 60d199428a..399902e986 100644
--- a/be/test/runtime/collection_value_test.cpp
+++ b/be/test/runtime/collection_value_test.cpp
@@ -61,24 +61,26 @@ TEST(CollectionValueTest, set) {
 
     // normal
     {
+        auto iterator = cv.iterator(TYPE_INT);
         IntVal v0 = IntVal::null();
-        cv.set(0, TYPE_INT, &v0);
+        iterator.set(&v0);
         for (int j = 1; j < cv.size(); ++j) {
             IntVal i(j + 10);
-            EXPECT_TRUE(cv.set(j, TYPE_INT, &i).ok());
+            iterator.seek(j);
+            iterator.set(&i);
         }
     }
 
     {
         auto iter = cv.iterator(TYPE_INT);
         IntVal v0;
-        iter.value(&v0);
+        iter.get(&v0);
         EXPECT_TRUE(v0.is_null);
         EXPECT_TRUE(iter.is_null());
         iter.next();
         for (int k = 1; k < cv.size(); ++k, iter.next()) {
             IntVal v;
-            iter.value(&v);
+            iter.get(&v);
             EXPECT_EQ(k + 10, v.val);
         }
     }
@@ -86,7 +88,8 @@ TEST(CollectionValueTest, set) {
     // over size
     {
         IntVal intv(20);
-        EXPECT_FALSE(cv.set(10, TYPE_INT, &intv).ok());
+        auto iterator = cv.iterator(TYPE_INT);
+        EXPECT_FALSE(iterator.seek(10));
     }
 }
 } // namespace doris
diff --git a/be/test/testutil/array_utils.cpp b/be/test/testutil/array_utils.cpp
index 1cb2d88d38..d1946a8559 100644
--- a/be/test/testutil/array_utils.cpp
+++ b/be/test/testutil/array_utils.cpp
@@ -58,10 +58,36 @@ TypeDesc ArrayUtils::create_function_type_desc(const ColumnPB& column_pb) {
     type_desc.scale = column_pb.frac();
     if (column_pb.type() == "ARRAY") {
         type_desc.type = FunctionContext::TYPE_ARRAY;
+    } else if (column_pb.type() == "BOOLEAN") {
+        type_desc.type = FunctionContext::TYPE_BOOLEAN;
+    } else if (column_pb.type() == "TINYINT") {
+        type_desc.type = FunctionContext::TYPE_TINYINT;
+    } else if (column_pb.type() == "SMALLINT") {
+        type_desc.type = FunctionContext::TYPE_SMALLINT;
     } else if (column_pb.type() == "INT") {
         type_desc.type = FunctionContext::TYPE_INT;
+    } else if (column_pb.type() == "BIGINT") {
+        type_desc.type = FunctionContext::TYPE_BIGINT;
+    } else if (column_pb.type() == "LARGEINT") {
+        type_desc.type = FunctionContext::TYPE_LARGEINT;
+    } else if (column_pb.type() == "FLOAT") {
+        type_desc.type = FunctionContext::TYPE_FLOAT;
+    } else if (column_pb.type() == "DOUBLE") {
+        type_desc.type = FunctionContext::TYPE_DOUBLE;
+    } else if (column_pb.type() == "CHAR") {
+        type_desc.type = FunctionContext::TYPE_CHAR;
     } else if (column_pb.type() == "VARCHAR") {
         type_desc.type = FunctionContext::TYPE_VARCHAR;
+    } else if (column_pb.type() == "STRING") {
+        type_desc.type = FunctionContext::TYPE_STRING;
+    } else if (column_pb.type() == "DATE") {
+        type_desc.type = FunctionContext::TYPE_DATE;
+    } else if (column_pb.type() == "DATETIME") {
+        type_desc.type = FunctionContext::TYPE_DATETIME;
+    } else if (column_pb.type().compare(0, 7, "DECIMAL") == 0) {
+        type_desc.type = FunctionContext::TYPE_DECIMALV2;
+    } else {
+        DCHECK(false) << "Failed to create function type descriptor.";
     }
     for (const auto& sub_column_pb : column_pb.children_columns()) {
         type_desc.children.push_back(create_function_type_desc(sub_column_pb));
diff --git a/be/test/testutil/array_utils.h b/be/test/testutil/array_utils.h
index 41503dcf99..85cc0434d5 100644
--- a/be/test/testutil/array_utils.h
+++ b/be/test/testutil/array_utils.h
@@ -26,7 +26,7 @@ namespace doris {
 class ColumnPB;
 class MemPool;
 class Status;
-struct CollectionValue;
+class CollectionValue;
 
 class ArrayUtils {
 public:
diff --git a/be/test/util/array_parser_test.cpp b/be/test/util/array_parser_test.cpp
index 0905313417..ba92b05020 100644
--- a/be/test/util/array_parser_test.cpp
+++ b/be/test/util/array_parser_test.cpp
@@ -118,4 +118,58 @@ TEST(ArrayParserTest, TestNestedArray) {
                       {array_data, num_arrays, true, array_null_signs});
 }
 
+TEST(ArrayParserTest, TestLargeIntArray) {
+    auto column_pb = create_column_pb("ARRAY", "LARGEINT");
+    test_array_parser(column_pb, "[]", CollectionValue(0));
+
+    __int128_t data[] = {(1L << 31) - 1, (1LU << 63) - 1, (1LU << 63) | ((1LU << 63) - 1)};
+    int num_items = sizeof(data) / sizeof(data[0]);
+    CollectionValue value(data, num_items, false, nullptr);
+    test_array_parser(column_pb, "[2147483647, 9223372036854775807, 18446744073709551615]", value);
+
+    bool null_signs[] = {false, true, false};
+    value.set_has_null(true);
+    value.set_null_signs(null_signs);
+    test_array_parser(column_pb, "[2147483647, null, 18446744073709551615]", value);
+
+    data[1] = static_cast<__int128_t>(1) << 66;
+    null_signs[1] = false;
+    test_array_parser(column_pb,
+                      "[\"2147483647\", \"73786976294838206464\", \"18446744073709551615\"]",
+                      value);
+}
+
+TEST(ArrayParserTest, TestDecimalArray) {
+    auto column_pb = create_column_pb("ARRAY", "DECIMAL");
+    test_array_parser(column_pb, "[]", CollectionValue(0));
+
+    std::string literals[] = {"2147483647", "9223372036854775807"};
+    uint32_t num_items = sizeof(literals) / sizeof(literals[0]);
+    decimal12_t data[num_items];
+    for (int i = 0; i < num_items; ++i) {
+        auto decimal_value = DecimalV2Value(literals[i]);
+        data[i].integer = decimal_value.int_value();
+        data[i].fraction = decimal_value.frac_value();
+    }
+    CollectionValue value(data, num_items, false, nullptr);
+    test_array_parser(column_pb, "[2147483647, 9223372036854775807]", value);
+
+    bool null_signs[] = {false, true};
+    value.set_has_null(true);
+    value.set_null_signs(null_signs);
+    test_array_parser(column_pb, "[2147483647, null]", value);
+
+    null_signs[1] = false;
+    test_array_parser(column_pb, "[\"2147483647\", \"9223372036854775807\"]", value);
+
+    literals[0] = "2147483647.5";
+    literals[1] = "34359738368.5";
+    for (int i = 0; i < num_items; ++i) {
+        auto decimal_value = DecimalV2Value(literals[i]);
+        data[i].integer = decimal_value.int_value();
+        data[i].fraction = decimal_value.frac_value();
+    }
+    value = {data, num_items, false, nullptr};
+    test_array_parser(column_pb, "[2147483647.5, \"34359738368.5\"]", value);
+}
 } // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java
index 024693437f..0f25cafd4f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TypeDef.java
@@ -115,10 +115,6 @@ public class TypeDef implements ParseNode {
     if (type.isNull()) {
       throw new AnalysisException("Unsupported data type: " + type.toSql());
     }
-    if (!type.getPrimitiveType().isIntegerType() &&
-            !type.getPrimitiveType().isCharFamily()) {
-      throw new AnalysisException("Array column just support INT/VARCHAR sub-type");
-    }
     if (type.getPrimitiveType().isStringType()
             && !type.isAssignedStrLenInColDefinition()) {
       type.setLength(1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ArrayType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ArrayType.java
index bbc517a81a..b86da0a275 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ArrayType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ArrayType.java
@@ -107,6 +107,9 @@ public class ArrayType extends Type {
     }
 
     public static boolean canCastTo(ArrayType type, ArrayType targetType) {
+        if (targetType.getItemType().isStringType() && type.getItemType().isStringType()) {
+            return true;
+        }
         return Type.canCastTo(type.getItemType(), targetType.getItemType());
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org