You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by pa...@apache.org on 2022/07/08 04:48:50 UTC

[doris] branch master updated: [feature-wip](array-type) add agg function collect_list and collect_set (#10606)

This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fe8acdb268 [feature-wip](array-type) add agg function collect_list and collect_set (#10606)
fe8acdb268 is described below

commit fe8acdb268942aeda7a12b6d1078830b0fa7eb3d
Author: camby <10...@qq.com>
AuthorDate: Fri Jul 8 12:48:46 2022 +0800

    [feature-wip](array-type) add agg function collect_list and collect_set (#10606)
    
    add codes for collect_list and collect_set and update regression output, before output format for ARRAY(string) already changed.
    
    Co-authored-by: cambyzju <zh...@baidu.com>
---
 be/src/vec/CMakeLists.txt                          |   1 +
 .../aggregate_function_collect.cpp                 |  86 +++++++
 .../aggregate_function_collect.h                   | 257 +++++++++++++++++++++
 .../aggregate_function_simple_factory.cpp          |   3 +
 be/test/CMakeLists.txt                             |   1 +
 .../vec/aggregate_functions/agg_collect_test.cpp   | 161 +++++++++++++
 .../aggregate-functions/collect_list.md            |  70 ++++++
 .../aggregate-functions/collect_set.md             |  70 ++++++
 .../aggregate-functions/collect_list.md            |  71 ++++++
 .../aggregate-functions/collect_set.md             |  70 ++++++
 .../java/org/apache/doris/catalog/FunctionSet.java |  12 +
 .../aggregate_functions/test_aggregate_collect.out |   9 +
 .../test_aggregate_collect.groovy                  |  41 ++++
 13 files changed, 852 insertions(+)

diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 549e931672..3eb9ae6c5a 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -22,6 +22,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/vec")
 set(VEC_FILES
   aggregate_functions/aggregate_function_window_funnel.cpp
   aggregate_functions/aggregate_function_avg.cpp
+  aggregate_functions/aggregate_function_collect.cpp
   aggregate_functions/aggregate_function_count.cpp
   aggregate_functions/aggregate_function_distinct.cpp
   aggregate_functions/aggregate_function_sum.cpp
diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.cpp b/be/src/vec/aggregate_functions/aggregate_function_collect.cpp
new file mode 100644
index 0000000000..9f58b8ee17
--- /dev/null
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/aggregate_functions/aggregate_function_collect.h"
+
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+
+namespace doris::vectorized {
+
+template <typename T>
+AggregateFunctionPtr create_agg_function_collect(bool distinct, const DataTypes& argument_types) {
+    if (distinct) {
+        return AggregateFunctionPtr(
+                new AggregateFunctionCollect<AggregateFunctionCollectSetData<T>>(argument_types));
+    } else {
+        return AggregateFunctionPtr(
+                new AggregateFunctionCollect<AggregateFunctionCollectListData<T>>(argument_types));
+    }
+}
+
+AggregateFunctionPtr create_aggregate_function_collect(const std::string& name,
+                                                       const DataTypes& argument_types,
+                                                       const Array& parameters,
+                                                       const bool result_is_nullable) {
+    if (argument_types.size() != 1) {
+        LOG(WARNING) << fmt::format("Illegal number {} of argument for aggregate function {}",
+                                    argument_types.size(), name);
+        return nullptr;
+    }
+
+    bool distinct = false;
+    if (name == "collect_set") {
+        distinct = true;
+    }
+
+    WhichDataType type(argument_types[0]);
+    if (type.is_uint8()) {
+        return create_agg_function_collect<UInt8>(distinct, argument_types);
+    } else if (type.is_int8()) {
+        return create_agg_function_collect<Int8>(distinct, argument_types);
+    } else if (type.is_int16()) {
+        return create_agg_function_collect<Int16>(distinct, argument_types);
+    } else if (type.is_int32()) {
+        return create_agg_function_collect<Int32>(distinct, argument_types);
+    } else if (type.is_int64()) {
+        return create_agg_function_collect<Int64>(distinct, argument_types);
+    } else if (type.is_int128()) {
+        return create_agg_function_collect<Int128>(distinct, argument_types);
+    } else if (type.is_float32()) {
+        return create_agg_function_collect<Float32>(distinct, argument_types);
+    } else if (type.is_float64()) {
+        return create_agg_function_collect<Float64>(distinct, argument_types);
+    } else if (type.is_decimal128()) {
+        return create_agg_function_collect<Decimal128>(distinct, argument_types);
+    } else if (type.is_date()) {
+        return create_agg_function_collect<Int64>(distinct, argument_types);
+    } else if (type.is_date_time()) {
+        return create_agg_function_collect<Int64>(distinct, argument_types);
+    } else if (type.is_string()) {
+        return create_agg_function_collect<StringRef>(distinct, argument_types);
+    }
+
+    LOG(WARNING) << fmt::format("unsupported input type {} for aggregate function {}",
+                                argument_types[0]->get_name(), name);
+    return nullptr;
+}
+
+void register_aggregate_function_collect_list(AggregateFunctionSimpleFactory& factory) {
+    factory.register_function("collect_list", create_aggregate_function_collect);
+    factory.register_function("collect_set", create_aggregate_function_collect);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h b/be/src/vec/aggregate_functions/aggregate_function_collect.h
new file mode 100644
index 0000000000..5df33ab6f2
--- /dev/null
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/key_holder_helpers.h"
+#include "vec/common/aggregation_common.h"
+#include "vec/common/hash_table/hash_set.h"
+#include "vec/common/pod_array_fwd.h"
+#include "vec/common/string_ref.h"
+#include "vec/data_types/data_type_array.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+template <typename T>
+struct AggregateFunctionCollectSetData {
+    using ElementType = T;
+    using ColVecType = ColumnVectorOrDecimal<ElementType>;
+    using ElementNativeType = typename NativeType<T>::Type;
+    using Set = HashSetWithStackMemory<ElementNativeType, DefaultHash<ElementNativeType>, 4>;
+    Set set;
+
+    void add(const IColumn& column, size_t row_num) {
+        const auto& vec = assert_cast<const ColVecType&>(column).get_data();
+        set.insert(vec[row_num]);
+    }
+    void merge(const AggregateFunctionCollectSetData& rhs) { set.merge(rhs.set); }
+    void write(BufferWritable& buf) const { set.write(buf); }
+    void read(BufferReadable& buf) { set.read(buf); }
+    void reset() { set.clear(); }
+    void insert_result_into(IColumn& to) const {
+        auto& vec = assert_cast<ColVecType&>(to).get_data();
+        vec.reserve(set.size());
+        for (auto item : set) {
+            vec.push_back(item.key);
+        }
+    }
+};
+
+template <>
+struct AggregateFunctionCollectSetData<StringRef> {
+    using ElementType = StringRef;
+    using ColVecType = ColumnString;
+    using Set = HashSetWithSavedHashWithStackMemory<ElementType, DefaultHash<ElementType>, 4>;
+    Set set;
+
+    void add(const IColumn& column, size_t row_num, Arena* arena) {
+        Set::LookupResult it;
+        bool inserted;
+        auto key_holder = get_key_holder<true>(column, row_num, *arena);
+        set.emplace(key_holder, it, inserted);
+    }
+
+    void merge(const AggregateFunctionCollectSetData& rhs, Arena* arena) {
+        Set::LookupResult it;
+        bool inserted;
+        for (const auto& elem : rhs.set) {
+            set.emplace(ArenaKeyHolder {elem.get_value(), *arena}, it, inserted);
+        }
+    }
+    void write(BufferWritable& buf) const {
+        write_var_uint(set.size(), buf);
+        for (const auto& elem : set) {
+            write_string_binary(elem.get_value(), buf);
+        }
+    }
+    void read(BufferReadable& buf) {
+        size_t rows;
+        read_var_uint(rows, buf);
+
+        StringRef ref;
+        for (size_t i = 0; i < rows; ++i) {
+            read_string_binary(ref, buf);
+            set.insert(ref);
+        }
+    }
+    void reset() { set.clear(); }
+    void insert_result_into(IColumn& to) const {
+        auto& vec = assert_cast<ColVecType&>(to);
+        vec.reserve(set.size());
+        for (const auto& item : set) {
+            vec.insert_data(item.key.data, item.key.size);
+        }
+    }
+};
+
+template <typename T>
+struct AggregateFunctionCollectListData {
+    using ElementType = T;
+    using ColVecType = ColumnVectorOrDecimal<ElementType>;
+    PaddedPODArray<ElementType> data;
+
+    void add(const IColumn& column, size_t row_num) {
+        const auto& vec = assert_cast<const ColVecType&>(column).get_data();
+        data.push_back(vec[row_num]);
+    }
+    void merge(const AggregateFunctionCollectListData& rhs) {
+        data.insert(rhs.data.begin(), rhs.data.end());
+    }
+    void write(BufferWritable& buf) const {
+        write_var_uint(data.size(), buf);
+        buf.write(data.raw_data(), data.size() * sizeof(ElementType));
+    }
+    void read(BufferReadable& buf) {
+        size_t rows = 0;
+        read_var_uint(rows, buf);
+        data.resize(rows);
+        buf.read(reinterpret_cast<char*>(data.data()), rows * sizeof(ElementType));
+    }
+    void reset() { data.clear(); }
+    void insert_result_into(IColumn& to) const {
+        auto& vec = assert_cast<ColVecType&>(to).get_data();
+        size_t old_size = vec.size();
+        vec.resize(old_size + data.size());
+        memcpy(vec.data() + old_size, data.data(), data.size() * sizeof(ElementType));
+    }
+};
+
+template <>
+struct AggregateFunctionCollectListData<StringRef> {
+    using ElementType = StringRef;
+    using ColVecType = ColumnString;
+    MutableColumnPtr data;
+
+    AggregateFunctionCollectListData<ElementType>() { data = ColVecType::create(); }
+
+    void add(const IColumn& column, size_t row_num) { data->insert_from(column, row_num); }
+
+    void merge(const AggregateFunctionCollectListData& rhs) {
+        data->insert_range_from(*rhs.data, 0, rhs.data->size());
+    }
+
+    void write(BufferWritable& buf) const {
+        auto& col = assert_cast<ColVecType&>(*data);
+
+        write_var_uint(col.size(), buf);
+        buf.write(col.get_offsets().raw_data(), col.size() * sizeof(IColumn::Offset));
+
+        write_var_uint(col.get_chars().size(), buf);
+        buf.write(col.get_chars().raw_data(), col.get_chars().size());
+    }
+
+    void read(BufferReadable& buf) {
+        auto& col = assert_cast<ColVecType&>(*data);
+        size_t offs_size = 0;
+        read_var_uint(offs_size, buf);
+        col.get_offsets().resize(offs_size);
+        buf.read(reinterpret_cast<char*>(col.get_offsets().data()),
+                 offs_size * sizeof(IColumn::Offset));
+
+        size_t chars_size = 0;
+        read_var_uint(chars_size, buf);
+        col.get_chars().resize(chars_size);
+        buf.read(reinterpret_cast<char*>(col.get_chars().data()), chars_size);
+    }
+
+    void reset() { data->clear(); }
+
+    void insert_result_into(IColumn& to) const {
+        auto& to_str = assert_cast<ColVecType&>(to);
+        to_str.insert_range_from(*data, 0, data->size());
+    }
+};
+
+template <typename Data>
+class AggregateFunctionCollect final
+        : public IAggregateFunctionDataHelper<Data, AggregateFunctionCollect<Data>> {
+public:
+    static constexpr bool alloc_memory_in_arena =
+            std::is_same_v<Data, AggregateFunctionCollectSetData<StringRef>>;
+
+    AggregateFunctionCollect(const DataTypes& argument_types_)
+            : IAggregateFunctionDataHelper<Data, AggregateFunctionCollect<Data>>(argument_types_,
+                                                                                 {}),
+              _argument_type(argument_types_[0]) {}
+
+    std::string get_name() const override {
+        if constexpr (std::is_same_v<AggregateFunctionCollectListData<typename Data::ElementType>,
+                                     Data>) {
+            return "collect_list";
+        } else {
+            return "collect_set";
+        }
+    }
+
+    DataTypePtr get_return_type() const override {
+        return std::make_shared<DataTypeArray>(make_nullable(_argument_type));
+    }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena* arena) const override {
+        assert(!columns[0]->is_null_at(row_num));
+        if constexpr (alloc_memory_in_arena) {
+            this->data(place).add(*columns[0], row_num, arena);
+        } else {
+            this->data(place).add(*columns[0], row_num);
+        }
+    }
+
+    void reset(AggregateDataPtr place) const override { this->data(place).reset(); }
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena* arena) const override {
+        if constexpr (alloc_memory_in_arena) {
+            this->data(place).merge(this->data(rhs), arena);
+        } else {
+            this->data(place).merge(this->data(rhs));
+        }
+    }
+
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
+        this->data(place).write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena*) const override {
+        this->data(place).read(buf);
+    }
+
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
+        auto& to_arr = assert_cast<ColumnArray&>(to);
+        auto& to_nested_col = to_arr.get_data();
+        if (to_nested_col.is_nullable()) {
+            auto col_null = reinterpret_cast<ColumnNullable*>(&to_nested_col);
+            this->data(place).insert_result_into(col_null->get_nested_column());
+            col_null->get_null_map_data().resize_fill(col_null->get_nested_column().size(), 0);
+        } else {
+            this->data(place).insert_result_into(to_nested_col);
+        }
+        to_arr.get_offsets().push_back(to_nested_col.size());
+    }
+
+    bool allocates_memory_in_arena() const override { return alloc_memory_in_arena; }
+
+private:
+    DataTypePtr _argument_type;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
index badf756f8b..58dc0a4c9b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp
@@ -47,6 +47,8 @@ void register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact
 void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& factory);
 void register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& factory);
 void register_aggregate_function_orthogonal_bitmap(AggregateFunctionSimpleFactory& factory);
+void register_aggregate_function_collect_list(AggregateFunctionSimpleFactory& factory);
+
 AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
     static std::once_flag oc;
     static AggregateFunctionSimpleFactory instance;
@@ -70,6 +72,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() {
         register_aggregate_function_percentile_approx(instance);
         register_aggregate_function_window_funnel(instance);
         register_aggregate_function_orthogonal_bitmap(instance);
+        register_aggregate_function_collect_list(instance);
 
         // if you only register function with no nullable, and wants to add nullable automatically, you should place function above this line
         register_aggregate_function_combinator_null(instance);
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index ed6693b992..1e2ffc8ade 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -317,6 +317,7 @@ set(UTIL_TEST_FILES
     util/interval_tree_test.cpp
 )
 set(VEC_TEST_FILES
+    vec/aggregate_functions/agg_collect_test.cpp
     vec/aggregate_functions/agg_test.cpp
     vec/aggregate_functions/agg_min_max_test.cpp
     vec/aggregate_functions/vec_window_funnel_test.cpp
diff --git a/be/test/vec/aggregate_functions/agg_collect_test.cpp b/be/test/vec/aggregate_functions/agg_collect_test.cpp
new file mode 100644
index 0000000000..28e31ca58c
--- /dev/null
+++ b/be/test/vec/aggregate_functions/agg_collect_test.cpp
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "common/logging.h"
+#include "gtest/gtest.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_collect.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::vectorized {
+
+void register_aggregate_function_collect_list(AggregateFunctionSimpleFactory& factory);
+
+class VAggCollectTest : public testing::Test {
+public:
+    void SetUp() {
+        AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
+        register_aggregate_function_collect_list(factory);
+    }
+
+    void TearDown() {}
+
+    bool is_distinct(const std::string& fn_name) { return fn_name == "collect_set"; }
+
+    template <typename DataType>
+    void agg_collect_add_elements(AggregateFunctionPtr agg_function, AggregateDataPtr place,
+                                  size_t input_nums) {
+        using FieldType = typename DataType::FieldType;
+        auto type = std::make_shared<DataType>();
+        auto input_col = type->create_column();
+        for (size_t i = 0; i < input_nums; ++i) {
+            for (size_t j = 0; j < _repeated_times; ++j) {
+                if constexpr (std::is_same_v<DataType, DataTypeString>) {
+                    auto item = std::string("item") + std::to_string(i);
+                    input_col->insert_data(item.c_str(), item.size());
+                } else {
+                    auto item = FieldType(i);
+                    input_col->insert_data(reinterpret_cast<const char*>(&item), 0);
+                }
+            }
+        }
+        EXPECT_EQ(input_col->size(), input_nums * _repeated_times);
+
+        const IColumn* column[1] = {input_col.get()};
+        for (int i = 0; i < input_col->size(); i++) {
+            agg_function->add(place, column, i, &_agg_arena_pool);
+        }
+    }
+
+    template <typename DataType>
+    void test_agg_collect(const std::string& fn_name, size_t input_nums = 0) {
+        DataTypes data_types = {(DataTypePtr)std::make_shared<DataType>()};
+        LOG(INFO) << "test_agg_collect for " << fn_name << "(" << data_types[0]->get_name() << ")";
+        Array array;
+        AggregateFunctionSimpleFactory factory = AggregateFunctionSimpleFactory::instance();
+        auto agg_function = factory.get(fn_name, data_types, array);
+        EXPECT_NE(agg_function, nullptr);
+
+        std::unique_ptr<char[]> memory(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place = memory.get();
+        agg_function->create(place);
+
+        agg_collect_add_elements<DataType>(agg_function, place, input_nums);
+
+        ColumnString buf;
+        VectorBufferWriter buf_writer(buf);
+        agg_function->serialize(place, buf_writer);
+        buf_writer.commit();
+        VectorBufferReader buf_reader(buf.get_data_at(0));
+        agg_function->deserialize(place, buf_reader, &_agg_arena_pool);
+
+        std::unique_ptr<char[]> memory2(new char[agg_function->size_of_data()]);
+        AggregateDataPtr place2 = memory2.get();
+        agg_function->create(place2);
+
+        agg_collect_add_elements<DataType>(agg_function, place2, input_nums);
+
+        agg_function->merge(place, place2, &_agg_arena_pool);
+        auto column_result = ColumnArray::create(data_types[0]->create_column());
+        agg_function->insert_result_into(place, *column_result);
+        EXPECT_EQ(column_result->size(), 1);
+        EXPECT_EQ(column_result->get_offsets()[0],
+                  is_distinct(fn_name) ? input_nums : 2 * input_nums * _repeated_times);
+
+        auto column_result2 = ColumnArray::create(data_types[0]->create_column());
+        agg_function->insert_result_into(place2, *column_result2);
+        EXPECT_EQ(column_result2->size(), 1);
+        EXPECT_EQ(column_result2->get_offsets()[0],
+                  is_distinct(fn_name) ? input_nums : input_nums * _repeated_times);
+
+        agg_function->destroy(place);
+        agg_function->destroy(place2);
+    }
+
+private:
+    const size_t _repeated_times = 2;
+    Arena _agg_arena_pool;
+};
+
+TEST_F(VAggCollectTest, test_empty) {
+    test_agg_collect<DataTypeInt8>("collect_list");
+    test_agg_collect<DataTypeInt8>("collect_set");
+    test_agg_collect<DataTypeInt16>("collect_list");
+    test_agg_collect<DataTypeInt16>("collect_set");
+    test_agg_collect<DataTypeInt32>("collect_list");
+    test_agg_collect<DataTypeInt32>("collect_set");
+    test_agg_collect<DataTypeInt64>("collect_list");
+    test_agg_collect<DataTypeInt64>("collect_set");
+    test_agg_collect<DataTypeInt128>("collect_list");
+    test_agg_collect<DataTypeInt128>("collect_set");
+
+    test_agg_collect<DataTypeDecimal<Decimal128>>("collect_list");
+    test_agg_collect<DataTypeDecimal<Decimal128>>("collect_set");
+
+    test_agg_collect<DataTypeDate>("collect_list");
+    test_agg_collect<DataTypeDate>("collect_set");
+
+    test_agg_collect<DataTypeString>("collect_list");
+    test_agg_collect<DataTypeString>("collect_set");
+}
+
+TEST_F(VAggCollectTest, test_with_data) {
+    test_agg_collect<DataTypeInt32>("collect_list", 7);
+    test_agg_collect<DataTypeInt32>("collect_set", 9);
+    test_agg_collect<DataTypeInt128>("collect_list", 20);
+    test_agg_collect<DataTypeInt128>("collect_set", 30);
+
+    test_agg_collect<DataTypeDecimal<Decimal128>>("collect_list", 10);
+    test_agg_collect<DataTypeDecimal<Decimal128>>("collect_set", 11);
+
+    test_agg_collect<DataTypeDateTime>("collect_list", 5);
+    test_agg_collect<DataTypeDateTime>("collect_set", 6);
+
+    test_agg_collect<DataTypeString>("collect_list", 10);
+    test_agg_collect<DataTypeString>("collect_set", 5);
+}
+
+} // namespace doris::vectorized
diff --git a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/collect_list.md b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/collect_list.md
new file mode 100644
index 0000000000..a27b540c78
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/collect_list.md
@@ -0,0 +1,70 @@
+---
+{
+    "title": "COLLECT_LIST",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## COLLECT_LIST
+### description
+#### Syntax
+
+`ARRAY<T> collect_list(expr)`
+
+Returns an array consisting of all values in expr within the group.
+The order of elements in the array is non-deterministic. NULL values are excluded.
+
+### notice
+
+```
+Only supported in vectorized engine
+```
+
+### example
+
+```
+mysql> set enable_vectorized_engine=true;
+mysql> set enable_array_type = true;
+
+mysql> select k1,k2,k3 from collect_test order by k1;
++------+------------+-------+
+| k1   | k2         | k3    |
++------+------------+-------+
+|    1 | 2022-07-05 | hello |
+|    2 | 2022-07-04 | NULL  |
+|    2 | 2022-07-04 | hello |
+|    3 | NULL       | world |
+|    3 | NULL       | world |
++------+------------+-------+
+
+mysql> select k1,collect_list(k2),collect_list(k3) from collect_test group by k1 order by k1;
++------+--------------------------+--------------------+
+| k1   | collect_list(`k2`)       | collect_list(`k3`) |
++------+--------------------------+--------------------+
+|    1 | [2022-07-05]             | [hello]            |
+|    2 | [2022-07-04, 2022-07-04] | [hello]            |
+|    3 | NULL                     | [world, world]     |
++------+--------------------------+--------------------+
+```
+
+### keywords
+COLLECT_LIST,COLLECT_SET,ARRAY
diff --git a/docs/en/docs/sql-manual/sql-functions/aggregate-functions/collect_set.md b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/collect_set.md
new file mode 100644
index 0000000000..8c3d617b3e
--- /dev/null
+++ b/docs/en/docs/sql-manual/sql-functions/aggregate-functions/collect_set.md
@@ -0,0 +1,70 @@
+---
+{
+    "title": "COLLECT_SET",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## COLLECT_SET
+### description
+#### Syntax
+
+`ARRAY<T> collect_set(expr)`
+
+Returns an array consisting of all unique values in expr within the group.
+The order of elements in the array is non-deterministic. NULL values are excluded.
+
+### notice
+
+```
+Only supported in vectorized engine
+```
+
+### example
+
+```
+mysql> set enable_vectorized_engine=true;
+mysql> set enable_array_type = true;
+
+mysql> select k1,k2,k3 from collect_test order by k1;
++------+------------+-------+
+| k1   | k2         | k3    |
++------+------------+-------+
+|    1 | 2022-07-05 | hello |
+|    2 | 2022-07-04 | NULL  |
+|    2 | 2022-07-04 | hello |
+|    3 | NULL       | world |
+|    3 | NULL       | world |
++------+------------+-------+
+
+mysql> select k1,collect_set(k2),collect_set(k3) from collect_test group by k1 order by k1;
++------+-------------------+-------------------+
+| k1   | collect_set(`k2`) | collect_set(`k3`) |
++------+-------------------+-------------------+
+|    1 | [2022-07-05]      | [hello]           |
+|    2 | [2022-07-04]      | [hello]           |
+|    3 | NULL              | [world]           |
++------+-------------------+-------------------+
+```
+
+### keywords
+COLLECT_SET,COLLECT_LIST,ARRAY
diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/collect_list.md b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/collect_list.md
new file mode 100644
index 0000000000..eaf56bf959
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/collect_list.md
@@ -0,0 +1,71 @@
+---
+{
+    "title": "COLLECT_LIST",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## COLLECT_LIST
+### description
+#### Syntax
+
+`ARRAY<T> collect_list(expr)`
+
+返回一个包含 expr 中所有元素(不包括NULL)的数组,数组中元素顺序是不确定的。
+
+
+### notice
+
+```
+仅支持向量化引擎中使用
+```
+
+### example
+
+```
+mysql> set enable_vectorized_engine=true;
+mysql> set enable_array_type = true;
+
+mysql> select k1,k2,k3 from collect_test order by k1;
++------+------------+-------+
+| k1   | k2         | k3    |
++------+------------+-------+
+|    1 | 2022-07-05 | hello |
+|    2 | 2022-07-04 | NULL  |
+|    2 | 2022-07-04 | hello |
+|    3 | NULL       | world |
+|    3 | NULL       | world |
++------+------------+-------+
+
+mysql> select k1,collect_list(k2),collect_list(k3) from collect_test group by k1 order by k1;
++------+--------------------------+--------------------+
+| k1   | collect_list(`k2`)       | collect_list(`k3`) |
++------+--------------------------+--------------------+
+|    1 | [2022-07-05]             | [hello]            |
+|    2 | [2022-07-04, 2022-07-04] | [hello]            |
+|    3 | NULL                     | [world, world]     |
++------+--------------------------+--------------------+
+
+```
+
+### keywords
+COLLECT_LIST,COLLECT_SET,ARRAY
diff --git a/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/collect_set.md b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/collect_set.md
new file mode 100644
index 0000000000..ccc734cea8
--- /dev/null
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/aggregate-functions/collect_set.md
@@ -0,0 +1,70 @@
+---
+{
+    "title": "COLLECT_SET",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## COLLECT_SET
+### description
+#### Syntax
+
+`ARRAY<T> collect_set(expr)`
+
+返回一个包含 expr 中所有去重后元素(不包括NULL)的数组,数组中元素顺序是不确定的。
+
+### notice
+
+```
+仅支持向量化引擎中使用
+```
+
+### example
+
+```
+mysql> set enable_vectorized_engine=true;
+mysql> set enable_array_type = true;
+
+mysql> select k1,k2,k3 from collect_test order by k1;
++------+------------+-------+
+| k1   | k2         | k3    |
++------+------------+-------+
+|    1 | 2022-07-05 | hello |
+|    2 | 2022-07-04 | NULL  |
+|    2 | 2022-07-04 | hello |
+|    3 | NULL       | world |
+|    3 | NULL       | world |
++------+------------+-------+
+
+mysql> select k1,collect_set(k2),collect_set(k3) from collect_test group by k1 order by k1;
++------+-------------------+-------------------+
+| k1   | collect_set(`k2`) | collect_set(`k3`) |
++------+-------------------+-------------------+
+|    1 | [2022-07-05]      | [hello]           |
+|    2 | [2022-07-04]      | [hello]           |
+|    3 | NULL              | [world]           |
++------+-------------------+-------------------+
+
+```
+
+### keywords
+COLLECT_SET,COLLECT_LIST,ARRAY
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
index 884a8fa452..c03722b0fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
@@ -855,6 +855,8 @@ public class FunctionSet<T> {
     //TODO(weixiang): is quantile_percent can be replaced by approx_percentile?
     public static final String QUANTILE_PERCENT = "quantile_percent";
     public static final String TO_QUANTILE_STATE = "to_quantile_state";
+    public static final String COLLECT_LIST = "collect_list";
+    public static final String COLLECT_SET = "collect_set";
 
     private static final Map<Type, String> ORTHOGONAL_BITMAP_INTERSECT_INIT_SYMBOL =
             ImmutableMap.<Type, String>builder()
@@ -2215,6 +2217,16 @@ public class FunctionSet<T> {
                 prefix + "26percentile_approx_finalizeEPN9doris_udf15FunctionContextERKNS1_9StringValE",
                 false, true, false, true));
 
+        // collect_list
+        Type[] arraySubTypes = {Type.BOOLEAN, Type.SMALLINT, Type.TINYINT, Type.INT, Type.BIGINT, Type.LARGEINT,
+                Type.FLOAT, Type.DOUBLE, Type.DATE, Type.DATETIME, Type.DECIMALV2, Type.VARCHAR, Type.STRING};
+        for (Type t : arraySubTypes) {
+            addBuiltin(AggregateFunction.createBuiltin(COLLECT_LIST, Lists.newArrayList(t), new ArrayType(t), t,
+                    "", "", "", "", "", true, false, true, true));
+            addBuiltin(AggregateFunction.createBuiltin(COLLECT_SET, Lists.newArrayList(t), new ArrayType(t), t,
+                    "", "", "", "", "", true, false, true, true));
+        }
+
         // Avg
         // TODO: switch to CHAR(sizeof(AvgIntermediateType) when that becomes available
         addBuiltin(AggregateFunction.createBuiltin("avg",
diff --git a/regression-test/data/query/sql_functions/aggregate_functions/test_aggregate_collect.out b/regression-test/data/query/sql_functions/aggregate_functions/test_aggregate_collect.out
new file mode 100644
index 0000000000..cec8df6151
--- /dev/null
+++ b/regression-test/data/query/sql_functions/aggregate_functions/test_aggregate_collect.out
@@ -0,0 +1,9 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select --
+1	['hello', 'hello']	[2022-07-04, 2022-07-04]	[1.23, 1.23]
+2	\N	\N	\N
+
+-- !select --
+1	['hello']	[2022-07-04]	[1.23]
+2	\N	\N	\N
+
diff --git a/regression-test/suites/query/sql_functions/aggregate_functions/test_aggregate_collect.groovy b/regression-test/suites/query/sql_functions/aggregate_functions/test_aggregate_collect.groovy
new file mode 100644
index 0000000000..a1c1ff260e
--- /dev/null
+++ b/regression-test/suites/query/sql_functions/aggregate_functions/test_aggregate_collect.groovy
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_aggregate_collect", "query") {
+    sql "set enable_vectorized_engine = true"
+    sql "set enable_array_type = true;"
+
+    def tableName = "collect_test"
+    sql "DROP TABLE IF EXISTS ${tableName}"
+    sql """
+	    CREATE TABLE IF NOT EXISTS ${tableName} (
+	        c_int INT,
+	        c_string VARCHAR(10),
+          c_date Date,
+          c_decimal DECIMAL(10, 2)
+	    )
+	    DISTRIBUTED BY HASH(c_int) BUCKETS 1
+	    PROPERTIES (
+	      "replication_num" = "1"
+	    ) 
+    """
+    sql "INSERT INTO ${tableName} values(1,'hello','2022-07-04',1.23), (2,NULL,NULL,NULL)"
+    sql "INSERT INTO ${tableName} values(1,'hello','2022-07-04',1.23), (2,NULL,NULL,NULL)"
+
+    qt_select "select c_int,collect_list(c_string),collect_list(c_date),collect_list(c_decimal) from ${tableName} group by c_int order by c_int"
+    qt_select "select c_int,collect_set(c_string),collect_set(c_date),collect_set(c_decimal) from ${tableName} group by c_int order by c_int"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org