You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/06 15:15:10 UTC

[doris] 08/36: [Enchancement](Agg State) storage function name and result is nullable in agg state type (#20298)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a01fb1197256724e03934d3f230d333d0afff0f6
Author: Pxl <px...@qq.com>
AuthorDate: Sun Jun 4 22:44:48 2023 +0800

    [Enchancement](Agg State)  storage function name and result is nullable in agg state type  (#20298)
    
    storage function name and result is nullable in agg state type
---
 be/CMakeLists.txt                                  |   2 +-
 be/src/olap/tablet_meta.cpp                        |   5 +
 be/src/olap/tablet_schema.cpp                      |  19 +--
 be/src/olap/tablet_schema.h                        |   4 +
 be/src/runtime/types.h                             |   8 ++
 be/src/vec/data_types/data_type_agg_state.h        |  42 ++++++-
 be/src/vec/data_types/data_type_factory.cpp        |  28 +++--
 be/src/vec/exec/vaggregation_node.cpp              |  13 +-
 be/src/vec/exprs/vectorized_agg_fn.cpp             |  27 +++--
 be/src/vec/exprs/vectorized_fn_call.cpp            |   4 +-
 be/src/vec/functions/function_agg_state.h          |  23 +++-
 .../org/apache/doris/catalog/AggStateType.java     | 134 +++++++++++++++++++++
 .../java/org/apache/doris/catalog/ScalarType.java  |  62 +---------
 .../main/java/org/apache/doris/catalog/Type.java   |   2 +-
 .../java/org/apache/doris/analysis/ColumnDef.java  |  13 +-
 .../main/java/org/apache/doris/analysis/Expr.java  |  76 ++++++++++--
 .../main/java/org/apache/doris/catalog/Column.java |   6 +-
 .../java/org/apache/doris/catalog/Function.java    |  41 +++++--
 .../apache/doris/nereids/types/AggStateType.java   |  14 ++-
 .../org/apache/doris/nereids/types/DataType.java   |   5 +-
 .../org/apache/doris/persist/gson/GsonUtils.java   |   4 +-
 gensrc/proto/data.proto                            |   2 +
 gensrc/proto/olap_file.proto                       |   1 +
 gensrc/thrift/Descriptors.thrift                   |   1 +
 gensrc/thrift/Types.thrift                         |   4 +-
 25 files changed, 386 insertions(+), 154 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 5f5058e483..712f11f672 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -454,7 +454,7 @@ endif()
 
 add_subdirectory(${SRC_DIR}/clucene EXCLUDE_FROM_ALL)
 
-set(clucene_options -w -Wall -Wno-non-virtual-dtor)
+set(clucene_options -w -Wall)
 if (COMPILER_CLANG)
     set(clucene_options ${clucene_options} -Wno-c++11-narrowing)
 else ()
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 432fd34ff3..3ee508afb9 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -315,6 +315,11 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco
     column->set_index_length(length);
     column->set_precision(tcolumn.column_type.precision);
     column->set_frac(tcolumn.column_type.scale);
+
+    if (tcolumn.__isset.result_is_nullable) {
+        column->set_result_is_nullable(tcolumn.result_is_nullable);
+    }
+
     if (tcolumn.column_type.type == TPrimitiveType::VARCHAR ||
         tcolumn.column_type.type == TPrimitiveType::STRING) {
         if (!tcolumn.column_type.__isset.index_len) {
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 20dd40aed1..f5f62e6857 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -427,6 +427,9 @@ void TabletColumn::init_from_pb(const ColumnPB& column) {
         _aggregation = get_aggregation_type_by_string(column.aggregation());
         _aggregation_name = column.aggregation();
     }
+    if (column.has_result_is_nullable()) {
+        _result_is_nullable = column.result_is_nullable();
+    }
     if (column.has_visible()) {
         _visible = column.visible();
     }
@@ -464,6 +467,7 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const {
     if (!_aggregation_name.empty()) {
         column->set_aggregation(_aggregation_name);
     }
+    column->set_result_is_nullable(_result_is_nullable);
     if (_has_bitmap_index) {
         column->set_has_bitmap_index(_has_bitmap_index);
     }
@@ -494,19 +498,8 @@ bool TabletColumn::is_row_store_column() const {
 
 vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function_union(
         vectorized::DataTypePtr type) const {
-    auto state_type = dynamic_cast<const vectorized::DataTypeAggState*>(type.get());
-    if (!state_type) {
-        return nullptr;
-    }
-    vectorized::DataTypes argument_types;
-    for (auto col : _sub_columns) {
-        auto sub_type = vectorized::DataTypeFactory::instance().create_data_type(col);
-        state_type->add_sub_type(sub_type);
-    }
-    auto agg_function = vectorized::AggregateFunctionSimpleFactory::instance().get(
-            _aggregation_name, state_type->get_sub_types(), false);
-
-    return vectorized::AggregateStateUnion::create(agg_function, {type}, type);
+    auto state_type = assert_cast<const vectorized::DataTypeAggState*>(type.get());
+    return vectorized::AggregateStateUnion::create(state_type->get_nested_function(), {type}, type);
 }
 
 vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function(std::string suffix) const {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 00ec5d6aa9..c82e1e8f2f 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -115,6 +115,8 @@ public:
     static FieldAggregationMethod get_aggregation_type_by_string(const std::string& str);
     static uint32_t get_field_length_by_type(TPrimitiveType::type type, uint32_t string_length);
     bool is_row_store_column() const;
+    std::string get_aggregation_name() const { return _aggregation_name; }
+    bool get_result_is_nullable() const { return _result_is_nullable; }
 
 private:
     int32_t _unique_id;
@@ -143,6 +145,8 @@ private:
     TabletColumn* _parent = nullptr;
     std::vector<TabletColumn> _sub_columns;
     uint32_t _sub_column_count = 0;
+
+    bool _result_is_nullable = false;
 };
 
 bool operator==(const TabletColumn& a, const TabletColumn& b);
diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h
index 1d261caeb2..e4ae6f9461 100644
--- a/be/src/runtime/types.h
+++ b/be/src/runtime/types.h
@@ -61,6 +61,10 @@ struct TypeDescriptor {
 
     std::vector<TypeDescriptor> children;
 
+    bool result_is_nullable = false;
+
+    std::string function_name;
+
     // Only set if type == TYPE_STRUCT. The field name of each child.
     std::vector<std::string> field_names;
 
@@ -153,6 +157,10 @@ struct TypeDescriptor {
                 result.children.push_back(from_thrift(sub));
                 result.contains_nulls.push_back(sub.is_nullable);
             }
+            DCHECK(t.__isset.result_is_nullable);
+            result.result_is_nullable = t.result_is_nullable;
+            DCHECK(t.__isset.function_name);
+            result.function_name = t.function_name;
         }
         return result;
     }
diff --git a/be/src/vec/data_types/data_type_agg_state.h b/be/src/vec/data_types/data_type_agg_state.h
index 1bb5dc4e00..eaf2c122d2 100644
--- a/be/src/vec/data_types/data_type_agg_state.h
+++ b/be/src/vec/data_types/data_type_agg_state.h
@@ -28,6 +28,8 @@
 #include <memory>
 #include <string>
 
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_string.h"
 
@@ -43,8 +45,24 @@ namespace doris::vectorized {
 
 class DataTypeAggState : public DataTypeString {
 public:
+    DataTypeAggState(DataTypes sub_types, bool result_is_nullable, std::string function_name)
+            : _sub_types(sub_types),
+              _result_is_nullable(result_is_nullable),
+              _function_name(function_name) {}
+
     const char* get_family_name() const override { return "AggState"; }
 
+    std::string do_get_name() const override {
+        std::string types;
+        for (auto type : _sub_types) {
+            if (!types.empty()) {
+                types += ", ";
+            }
+            types += type->get_name();
+        }
+        return "AggState(" + types + ")";
+    }
+
     TypeIndex get_type_id() const override { return TypeIndex::AggState; }
 
     PrimitiveType get_type_as_primitive_type() const override { return TYPE_AGG_STATE; }
@@ -52,19 +70,35 @@ public:
         return TPrimitiveType::AGG_STATE;
     }
 
-    const DataTypes& get_sub_types() const { return sub_types; }
+    std::string to_string(const IColumn& column, size_t row_num) const override {
+        std::string res = "binary(";
+        StringRef str = column.get_data_at(row_num);
+        for (auto c : str.to_string()) {
+            res += std::to_string(int(c));
+            res += ' ';
+        }
+        res += ")";
+        return res;
+    }
 
-    void add_sub_type(DataTypePtr type) const { sub_types.push_back(type); }
+    const DataTypes& get_sub_types() const { return _sub_types; }
 
     void to_pb_column_meta(PColumnMeta* col_meta) const override {
         IDataType::to_pb_column_meta(col_meta);
-        for (auto type : sub_types) {
+        for (auto type : _sub_types) {
             type->to_pb_column_meta(col_meta->add_children());
         }
     }
 
+    AggregateFunctionPtr get_nested_function() const {
+        return AggregateFunctionSimpleFactory::instance().get(_function_name, _sub_types,
+                                                              _result_is_nullable);
+    }
+
 private:
-    mutable DataTypes sub_types;
+    DataTypes _sub_types;
+    bool _result_is_nullable;
+    std::string _function_name;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp
index 438e266658..3a2bf13ea7 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -40,6 +40,7 @@
 #include "runtime/define_primitive_type.h"
 #include "vec/common/uint128.h"
 #include "vec/core/types.h"
+#include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_agg_state.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_bitmap.h"
@@ -115,6 +116,13 @@ DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc, bool
             names.push_back(col_desc.get_sub_column(i).name());
         }
         nested = std::make_shared<DataTypeStruct>(dataTypes, names);
+    } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_AGG_STATE) {
+        DataTypes dataTypes;
+        for (size_t i = 0; i < col_desc.get_subtype_count(); i++) {
+            dataTypes.push_back(create_data_type(col_desc.get_sub_column(i)));
+        }
+        nested = std::make_shared<vectorized::DataTypeAggState>(
+                dataTypes, col_desc.get_result_is_nullable(), col_desc.get_aggregation_name());
     } else {
         nested =
                 _create_primitive_data_type(col_desc.type(), col_desc.precision(), col_desc.frac());
@@ -128,6 +136,7 @@ DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc, bool
 
 DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bool is_nullable) {
     DataTypePtr nested = nullptr;
+    DataTypes subTypes;
     switch (col_desc.type) {
     case TYPE_BOOLEAN:
         nested = std::make_shared<vectorized::DataTypeUInt8>();
@@ -177,12 +186,11 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo
         nested = std::make_shared<vectorized::DataTypeString>();
         break;
     case TYPE_AGG_STATE:
-        nested = std::make_shared<vectorized::DataTypeAggState>();
         for (size_t i = 0; i < col_desc.children.size(); i++) {
-            ((DataTypeAggState*)nested.get())
-                    ->add_sub_type(
-                            create_data_type(col_desc.children[i], col_desc.contains_nulls[i]));
+            subTypes.push_back(create_data_type(col_desc.children[i], col_desc.contains_nulls[i]));
         }
+        nested = std::make_shared<vectorized::DataTypeAggState>(
+                subTypes, col_desc.result_is_nullable, col_desc.function_name);
         break;
     case TYPE_JSONB:
         nested = std::make_shared<vectorized::DataTypeJsonb>();
@@ -336,9 +344,6 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeIndex& type_index, bool
     case TypeIndex::TimeV2:
         nested = std::make_shared<vectorized::DataTypeTime>();
         break;
-    case TypeIndex::AggState:
-        nested = std::make_shared<vectorized::DataTypeAggState>();
-        break;
     default:
         DCHECK(false) << "invalid typeindex:" << getTypeName(type_index);
         break;
@@ -395,9 +400,6 @@ DataTypePtr DataTypeFactory::_create_primitive_data_type(const FieldType& type,
     case FieldType::OLAP_FIELD_TYPE_STRING:
         result = std::make_shared<vectorized::DataTypeString>();
         break;
-    case FieldType::OLAP_FIELD_TYPE_AGG_STATE:
-        result = std::make_shared<vectorized::DataTypeAggState>();
-        break;
     case FieldType::OLAP_FIELD_TYPE_JSONB:
         result = std::make_shared<vectorized::DataTypeJsonb>();
         break;
@@ -545,10 +547,12 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) {
         break;
     }
     case PGenericType::AGG_STATE: {
-        nested = std::make_shared<DataTypeAggState>();
+        DataTypes sub_types;
         for (auto child : pcolumn.children()) {
-            ((DataTypeAggState*)nested.get())->add_sub_type(create_data_type(child));
+            sub_types.push_back(create_data_type(child));
         }
+        nested = std::make_shared<DataTypeAggState>(sub_types, pcolumn.result_is_nullable(),
+                                                    pcolumn.function_name());
         break;
     }
     default: {
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index f4bde7569a..bb2ce8377e 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -668,13 +668,12 @@ Status AggregationNode::_get_without_key_result(RuntimeState* state, Block* bloc
         const auto column_type = block_schema[i].type;
         if (!column_type->equals(*data_types[i])) {
             if (!is_array(remove_nullable(column_type))) {
-                DCHECK(column_type->is_nullable());
-                DCHECK(!data_types[i]->is_nullable())
-                        << " column type: " << column_type->get_name()
-                        << ", data type: " << data_types[i]->get_name();
-                DCHECK(remove_nullable(column_type)->equals(*data_types[i]))
-                        << " column type: " << remove_nullable(column_type)->get_name()
-                        << ", data type: " << data_types[i]->get_name();
+                if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
+                    !remove_nullable(column_type)->equals(*data_types[i])) {
+                    return Status::InternalError(
+                            "column_type not match data_types, column_type={}, data_types={}",
+                            column_type->get_name(), data_types[i]->get_name());
+                }
             }
 
             ColumnPtr ptr = std::move(columns[i]);
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 513eb8fe98..cb2f4cadab 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -56,13 +56,10 @@ class IColumn;
 namespace doris::vectorized {
 
 template <class FunctionType>
-AggregateFunctionPtr get_agg_state_function(const std::string& name,
-                                            const DataTypes& argument_types,
+AggregateFunctionPtr get_agg_state_function(const DataTypes& argument_types,
                                             DataTypePtr return_type) {
     return FunctionType::create(
-            AggregateFunctionSimpleFactory::instance().get(
-                    name, ((DataTypeAggState*)argument_types[0].get())->get_sub_types(),
-                    return_type->is_nullable()),
+            assert_cast<const DataTypeAggState*>(argument_types[0].get())->get_nested_function(),
             argument_types, return_type);
 }
 
@@ -159,14 +156,22 @@ Status AggFnEvaluator::prepare(RuntimeState* state, const RowDescriptor& desc,
     } else if (_fn.binary_type == TFunctionBinaryType::RPC) {
         _function = AggregateRpcUdaf::create(_fn, argument_types, _data_type);
     } else if (_fn.binary_type == TFunctionBinaryType::AGG_STATE) {
+        if (argument_types.size() != 1) {
+            return Status::InternalError("Agg state Function must input 1 argument but get {}",
+                                         argument_types.size());
+        }
+        if (argument_types[0]->is_nullable()) {
+            return Status::InternalError("Agg state function input type must be not nullable");
+        }
+        if (argument_types[0]->get_type_as_primitive_type() != PrimitiveType::TYPE_AGG_STATE) {
+            return Status::InternalError(
+                    "Agg state function input type must be agg_state but get {}",
+                    argument_types[0]->get_family_name());
+        }
         if (match_suffix(_fn.name.function_name, AGG_UNION_SUFFIX)) {
-            _function = get_agg_state_function<AggregateStateUnion>(
-                    remove_suffix(_fn.name.function_name, AGG_UNION_SUFFIX), argument_types,
-                    _data_type);
+            _function = get_agg_state_function<AggregateStateUnion>(argument_types, _data_type);
         } else if (match_suffix(_fn.name.function_name, AGG_MERGE_SUFFIX)) {
-            _function = get_agg_state_function<AggregateStateMerge>(
-                    remove_suffix(_fn.name.function_name, AGG_MERGE_SUFFIX), argument_types,
-                    _data_type);
+            _function = get_agg_state_function<AggregateStateMerge>(argument_types, _data_type);
         } else {
             return Status::InternalError(
                     "Aggregate Function {} is not endwith '_merge' or '_union'", _fn.signature);
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp b/be/src/vec/exprs/vectorized_fn_call.cpp
index 53cd19ed0d..582acb9da9 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -96,9 +96,7 @@ Status VectorizedFnCall::prepare(RuntimeState* state, const RowDescriptor& desc,
             }
             _function = FunctionAggState::create(
                     argument_types, _data_type,
-                    AggregateFunctionSimpleFactory::instance().get(
-                            remove_suffix(_fn.name.function_name, AGG_STATE_SUFFIX), argument_types,
-                            _data_type->is_nullable()));
+                    assert_cast<const DataTypeAggState*>(_data_type.get())->get_nested_function());
         } else {
             return Status::InternalError("Function {} is not endwith '_state'", _fn.signature);
         }
diff --git a/be/src/vec/functions/function_agg_state.h b/be/src/vec/functions/function_agg_state.h
index e17fc06c60..820eebaff9 100644
--- a/be/src/vec/functions/function_agg_state.h
+++ b/be/src/vec/functions/function_agg_state.h
@@ -21,10 +21,13 @@
 
 #include "common/status.h"
 #include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
 #include "vec/common/arena.h"
 #include "vec/core/block.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_agg_state.h"
 #include "vec/functions/function.h"
 
 namespace doris::vectorized {
@@ -62,9 +65,23 @@ public:
                         size_t result, size_t input_rows_count) override {
         auto col = _return_type->create_column();
         std::vector<const IColumn*> agg_columns;
-
-        for (size_t index : arguments) {
-            agg_columns.push_back(block.get_by_position(index).column);
+        std::vector<ColumnPtr> save_columns;
+
+        for (size_t i = 0; i < arguments.size(); i++) {
+            DataTypePtr signature =
+                    assert_cast<const DataTypeAggState*>(_return_type.get())->get_sub_types()[i];
+            ColumnPtr column = block.get_by_position(arguments[i]).column;
+
+            if (!signature->is_nullable() && column->is_nullable()) {
+                return Status::InternalError(
+                        "State function meet input nullable column, but signature is not nullable");
+            }
+            if (!column->is_nullable() && signature->is_nullable()) {
+                column = make_nullable(column);
+                save_columns.push_back(column);
+            }
+
+            agg_columns.push_back(column);
         }
 
         VectorBufferWriter writter(assert_cast<ColumnString&>(*col));
diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java
new file mode 100644
index 0000000000..47477a08f8
--- /dev/null
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.thrift.TTypeDesc;
+import org.apache.doris.thrift.TTypeNode;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AggStateType extends ScalarType {
+
+    @SerializedName(value = "subTypes")
+    private List<Type> subTypes;
+
+    @SerializedName(value = "subTypeNullables")
+    private List<Boolean> subTypeNullables;
+
+    @SerializedName(value = "resultIsNullable")
+    private Boolean resultIsNullable;
+
+    @SerializedName(value = "functionName")
+    private String functionName;
+
+    public AggStateType(String functionName, Boolean resultIsNullable, List<Type> subTypes,
+            List<Boolean> subTypeNullables) {
+        super(PrimitiveType.AGG_STATE);
+        Preconditions.checkState((subTypes == null) == (subTypeNullables == null));
+        if (subTypes != null && subTypeNullables != null) {
+            Preconditions.checkState(subTypes.size() == subTypeNullables.size(),
+                    "AggStateType' subTypes.size()!=subTypeNullables.size()");
+        }
+        this.functionName = functionName;
+        this.subTypes = subTypes;
+        this.subTypeNullables = subTypeNullables;
+        this.resultIsNullable = resultIsNullable;
+    }
+
+    public List<Type> getSubTypes() {
+        return subTypes;
+    }
+
+    public List<Boolean> getSubTypeNullables() {
+        return subTypeNullables;
+    }
+
+    public String getFunctionName() {
+        return functionName;
+    }
+
+    public boolean getResultIsNullable() {
+        return resultIsNullable;
+    }
+
+    @Override
+    public String toSql(int depth) {
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append("agg_state(");
+        for (int i = 0; i < subTypes.size(); i++) {
+            if (i > 0) {
+                stringBuilder.append(", ");
+            }
+            stringBuilder.append(subTypes.get(i).toSql());
+            if (subTypeNullables.get(i)) {
+                stringBuilder.append(" NULL");
+            }
+        }
+        stringBuilder.append(")");
+        return stringBuilder.toString();
+    }
+
+    @Override
+    public void toThrift(TTypeDesc container) {
+        super.toThrift(container);
+        if (subTypes != null) {
+            List<TTypeDesc> types = new ArrayList<TTypeDesc>();
+            for (int i = 0; i < subTypes.size(); i++) {
+                TTypeDesc desc = new TTypeDesc();
+                desc.setTypes(new ArrayList<TTypeNode>());
+                subTypes.get(i).toThrift(desc);
+                desc.setIsNullable(subTypeNullables.get(i));
+                types.add(desc);
+            }
+            container.setSubTypes(types);
+        }
+        container.setResultIsNullable(resultIsNullable);
+        container.setFunctionName(functionName);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof AggStateType)) {
+            return false;
+        }
+        AggStateType other = (AggStateType) o;
+        if ((subTypes == null) != (other.getSubTypes() == null)) {
+            return false;
+        }
+        if (subTypes == null) {
+            return true;
+        }
+        int subTypeNumber = subTypeNullables.size();
+        if (subTypeNumber != other.subTypeNullables.size()) {
+            return false;
+        }
+        for (int i = 0; i < subTypeNumber; i++) {
+            if (!subTypeNullables.get(i).equals(other.subTypeNullables.get(i))) {
+                return false;
+            }
+            if (!subTypes.get(i).equals(other.subTypes.get(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
index 5e4a54f948..9493b6d6ae 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/ScalarType.java
@@ -31,8 +31,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Objects;
 
 /**
@@ -125,30 +123,10 @@ public class ScalarType extends Type {
     @SerializedName(value = "lenStr")
     private String lenStr;
 
-    @SerializedName(value = "subTypes")
-    private List<Type> subTypes;
-
-    @SerializedName(value = "subTypeNullables")
-    private List<Boolean> subTypeNullables;
-
-    public List<Type> getSubTypes() {
-        return subTypes;
-    }
-
-    public List<Boolean> getSubTypeNullables() {
-        return subTypeNullables;
-    }
-
     public ScalarType(PrimitiveType type) {
         this.type = type;
     }
 
-    public ScalarType(List<Type> subTypes, List<Boolean> subTypeNullables) {
-        this.type = PrimitiveType.AGG_STATE;
-        this.subTypes = subTypes;
-        this.subTypeNullables = subTypeNullables;
-    }
-
     public static ScalarType createType(PrimitiveType type, int len, int precision, int scale) {
         switch (type) {
             case CHAR:
@@ -667,17 +645,7 @@ public class ScalarType extends Type {
                 stringBuilder.append("json");
                 break;
             case AGG_STATE:
-                stringBuilder.append("agg_state(");
-                for (int i = 0; i < subTypes.size(); i++) {
-                    if (i > 0) {
-                        stringBuilder.append(", ");
-                    }
-                    stringBuilder.append(subTypes.get(i).toSql());
-                    if (subTypeNullables.get(i)) {
-                        stringBuilder.append(" NULL");
-                    }
-                }
-                stringBuilder.append(")");
+                stringBuilder.append("agg_state(unknown)");
                 break;
             default:
                 stringBuilder.append("unknown type: " + type.toString());
@@ -723,18 +691,6 @@ public class ScalarType extends Type {
                 break;
         }
         node.setScalarType(scalarType);
-
-        if (subTypes != null) {
-            List<TTypeDesc> types = new ArrayList<TTypeDesc>();
-            for (int i = 0; i < subTypes.size(); i++) {
-                TTypeDesc desc = new TTypeDesc();
-                desc.setTypes(new ArrayList<TTypeNode>());
-                subTypes.get(i).toThrift(desc);
-                desc.setIsNullable(subTypeNullables.get(i));
-                types.add(desc);
-            }
-            container.setSubTypes(types);
-        }
     }
 
     public int decimalPrecision() {
@@ -908,22 +864,6 @@ public class ScalarType extends Type {
             return false;
         }
         ScalarType other = (ScalarType) o;
-        if (this.isAggStateType() && other.isAggStateType()) {
-            int subTypeNumber = subTypeNullables.size();
-            if (subTypeNumber != other.subTypeNullables.size()) {
-                return false;
-            }
-            for (int i = 0; i < subTypeNumber; i++) {
-                if (!subTypeNullables.get(i).equals(other.subTypeNullables.get(i))) {
-                    return false;
-                }
-                if (!subTypes.get(i).equals(other.subTypes.get(i))) {
-                    return false;
-                }
-            }
-            return true;
-        }
-
         if ((this.isDatetimeV2() && other.isDatetimeV2()) || (this.isTimeV2() && other.isTimeV2())) {
             return this.decimalScale() == other.decimalScale();
         }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
index c801921dc6..22d0bcd65c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java
@@ -102,7 +102,7 @@ public abstract class Type {
     public static final ScalarType CHAR = ScalarType.createCharType(-1);
     public static final ScalarType BITMAP = new ScalarType(PrimitiveType.BITMAP);
     public static final ScalarType QUANTILE_STATE = new ScalarType(PrimitiveType.QUANTILE_STATE);
-    public static final ScalarType AGG_STATE = new ScalarType(PrimitiveType.AGG_STATE);
+    public static final AggStateType AGG_STATE = new AggStateType(null, null, null, null);
     public static final ScalarType LAMBDA_FUNCTION = new ScalarType(PrimitiveType.LAMBDA_FUNCTION);
     // Only used for alias function, to represent any type in function args
     public static final ScalarType ALL = new ScalarType(PrimitiveType.ALL);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index cbe308d1cd..7cd5ca9aa3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -497,19 +497,18 @@ public class ColumnDef {
     public Column toColumn() {
         List<Type> typeList = null;
         List<Boolean> nullableList = null;
+
+        Type type = typeDef.getType();
         if (genericAggregationArguments != null) {
             typeList = genericAggregationArguments.stream().map(TypeDef::getType).collect(Collectors.toList());
             nullableList = genericAggregationArguments.stream().map(TypeDef::getNullable).collect(Collectors.toList());
-        }
 
-        Type type = typeDef.getType();
-        if (type.isAggStateType()) {
-            type = new ScalarType(typeList, nullableList);
+            type = Expr.createAggStateType(genericAggregationName, typeList, nullableList);
         }
 
-        return new Column(name, type, isKey, aggregateType, isAllowNull, defaultValue.value, comment,
-                visible, defaultValue.defaultValueExprDef, Column.COLUMN_UNIQUE_ID_INIT_VALUE,
-                defaultValue.getValue(), genericAggregationName, typeList, nullableList);
+        return new Column(name, type, isKey, aggregateType, isAllowNull, defaultValue.value, comment, visible,
+                defaultValue.defaultValueExprDef, Column.COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue.getValue(),
+                genericAggregationName, typeList, nullableList);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
index 93d5437564..a6cc659794 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
@@ -21,6 +21,7 @@
 package org.apache.doris.analysis;
 
 import org.apache.doris.analysis.ArithmeticExpr.Operator;
+import org.apache.doris.catalog.AggStateType;
 import org.apache.doris.catalog.AggregateFunction;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Env;
@@ -444,8 +445,8 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
             setSelectivity();
         }
         analysisDone();
-        if (type.isAggStateType() && !(this instanceof SlotRef) && ((ScalarType) type).getSubTypes() == null) {
-            type = new ScalarType(Arrays.asList(collectChildReturnTypes()),
+        if (type.isAggStateType() && !(this instanceof SlotRef) && ((AggStateType) type).getSubTypes() == null) {
+            type = createAggStateType(((AggStateType) type), Arrays.asList(collectChildReturnTypes()),
                     Arrays.asList(collectChildReturnNullables()));
         }
     }
@@ -1004,8 +1005,8 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     // Append a flattened version of this expr, including all children, to 'container'.
     protected void treeToThriftHelper(TExpr container) {
         TExprNode msg = new TExprNode();
-        if (type.isAggStateType() && ((ScalarType) type).getSubTypes() == null) {
-            type = new ScalarType(Arrays.asList(collectChildReturnTypes()),
+        if (type.isAggStateType() && ((AggStateType) type).getSubTypes() == null) {
+            type = createAggStateType(((AggStateType) type), Arrays.asList(collectChildReturnTypes()),
                     Arrays.asList(collectChildReturnNullables()));
         }
         msg.type = type.toThrift();
@@ -1482,7 +1483,7 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         }
 
         if (this.type.isAggStateType()) {
-            List<Type> subTypes = ((ScalarType) targetType).getSubTypes();
+            List<Type> subTypes = ((AggStateType) targetType).getSubTypes();
 
             if (this instanceof FunctionCallExpr) {
                 if (subTypes.size() != getChildren().size()) {
@@ -1493,7 +1494,7 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                 }
                 type = targetType;
             } else {
-                List<Type> selfSubTypes = ((ScalarType) type).getSubTypes();
+                List<Type> selfSubTypes = ((AggStateType) type).getSubTypes();
                 if (subTypes.size() != selfSubTypes.size()) {
                     throw new AnalysisException("AggState's subTypes size did not match");
                 }
@@ -1879,7 +1880,7 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                 if (argList.size() != 1 || !argList.get(0).isAggStateType()) {
                     throw new AnalysisException("merge/union function must input one agg_state");
                 }
-                ScalarType aggState = (ScalarType) argList.get(0);
+                AggStateType aggState = (AggStateType) argList.get(0);
                 if (aggState.getSubTypes() == null) {
                     throw new AnalysisException("agg_state's subTypes is null");
                 }
@@ -1895,9 +1896,10 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
 
             if (isState) {
                 f = new ScalarFunction(new FunctionName(name + AGG_STATE_SUFFIX), Arrays.asList(f.getArgs()),
-                        Type.AGG_STATE, f.hasVarArgs(), f.isUserVisible());
+                        Expr.createAggStateType(name, null, null), f.hasVarArgs(), f.isUserVisible());
                 f.setNullableMode(NullableMode.ALWAYS_NOT_NULLABLE);
             } else {
+                Function original = f;
                 f = ((AggregateFunction) f).clone();
                 f.setArgs(argList);
                 if (isUnion) {
@@ -1907,6 +1909,8 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                 }
                 if (isMerge) {
                     f.setName(new FunctionName(name + AGG_MERGE_SUFFIX));
+                    f.setNullableMode(NullableMode.CUSTOM);
+                    f.setNestedFunction(original);
                 }
             }
             f.setBinaryType(TFunctionBinaryType.AGG_STATE);
@@ -2221,6 +2225,10 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     }
 
     protected boolean hasNullableChild() {
+        return hasNullableChild(children);
+    }
+
+    protected static boolean hasNullableChild(List<Expr> children) {
         for (Expr expr : children) {
             if (expr.isNullable()) {
                 return true;
@@ -2244,24 +2252,34 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
      * overwrite this method to plan correct
      */
     public boolean isNullable() {
+        return isNullable(fn, children);
+    }
+
+    public static boolean isNullable(Function fn, List<Expr> children) {
         if (fn == null) {
             return true;
         }
         switch (fn.getNullableMode()) {
             case DEPEND_ON_ARGUMENT:
-                return hasNullableChild();
+                return hasNullableChild(children);
             case ALWAYS_NOT_NULLABLE:
                 return false;
             case CUSTOM:
-                return customNullableAlgorithm();
+                return customNullableAlgorithm(fn, children);
             case ALWAYS_NULLABLE:
             default:
                 return true;
         }
     }
 
-    private boolean customNullableAlgorithm() {
+    private static boolean customNullableAlgorithm(Function fn, List<Expr> children) {
         Preconditions.checkState(fn.getNullableMode() == Function.NullableMode.CUSTOM);
+
+        if (fn.functionName().endsWith(AGG_MERGE_SUFFIX)) {
+            AggStateType type = (AggStateType) fn.getArgs()[0];
+            return isNullable(fn.getNestedFunction(), getMockedExprs(type));
+        }
+
         if (fn.functionName().equalsIgnoreCase("if")) {
             Preconditions.checkState(children.size() == 3);
             for (int i = 1; i < children.size(); i++) {
@@ -2295,7 +2313,7 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                     && ConnectContext.get().getSessionVariable().checkOverflowForDecimal()) {
                 return true;
             } else {
-                return hasNullableChild();
+                return hasNullableChild(children);
             }
         }
         if ((fn.functionName().equalsIgnoreCase(Operator.ADD.getName())
@@ -2305,7 +2323,7 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
                     && ConnectContext.get().getSessionVariable().checkOverflowForDecimal()) {
                 return true;
             } else {
-                return hasNullableChild();
+                return hasNullableChild(children);
             }
         }
         if (fn.functionName().equalsIgnoreCase("group_concat")) {
@@ -2323,6 +2341,38 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         return true;
     }
 
+    public static Boolean getResultIsNullable(String name, List<Type> typeList, List<Boolean> nullableList) {
+        if (name == null || typeList == null || nullableList == null) {
+            return false;
+        }
+        FunctionName fnName = new FunctionName(name);
+        Function searchDesc = new Function(fnName, typeList, Type.INVALID, false, true);
+        List<Expr> mockedExprs = getMockedExprs(typeList, nullableList);
+        Function f = Env.getCurrentEnv().getFunction(searchDesc, Function.CompareMode.IS_IDENTICAL);
+        return isNullable(f, mockedExprs);
+    }
+
+    public static AggStateType createAggStateType(String name, List<Type> typeList, List<Boolean> nullableList) {
+        return new AggStateType(name, Expr.getResultIsNullable(name, typeList, nullableList), typeList, nullableList);
+    }
+
+    public static AggStateType createAggStateType(AggStateType type, List<Type> typeList, List<Boolean> nullableList) {
+        return new AggStateType(type.getFunctionName(),
+                Expr.getResultIsNullable(type.getFunctionName(), typeList, nullableList), typeList, nullableList);
+    }
+
+    public static List<Expr> getMockedExprs(List<Type> typeList, List<Boolean> nullableList) {
+        List<Expr> mockedExprs = Lists.newArrayList();
+        for (int i = 0; i < typeList.size(); i++) {
+            mockedExprs.add(new SlotRef(typeList.get(i), nullableList.get(i)));
+        }
+        return mockedExprs;
+    }
+
+    public static List<Expr> getMockedExprs(AggStateType type) {
+        return getMockedExprs(type.getSubTypes(), type.getSubTypeNullables());
+    }
+
     public void materializeSrcExpr() {
         if (this instanceof SlotRef) {
             SlotRef thisRef = (SlotRef) this;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index f9722b47c0..936db90a35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -496,9 +496,7 @@ public class Column implements Writable, GsonPostProcessable {
         if (null != this.aggregationType) {
             tColumn.setAggregationType(this.aggregationType.toThrift());
         }
-        if (genericAggregationName != null) {
-            tColumn.setAggregation(genericAggregationName);
-        }
+
         tColumn.setIsKey(this.isKey);
         tColumn.setIsAllowNull(this.isAllowNull);
         // keep compatibility
@@ -509,6 +507,8 @@ public class Column implements Writable, GsonPostProcessable {
         tColumn.setColUniqueId(uniqueId);
 
         if (type.isAggStateType()) {
+            tColumn.setAggregation(genericAggregationName);
+            tColumn.setResultIsNullable(((AggStateType) type).getResultIsNullable());
             for (Column column : children) {
                 tColumn.addToChildrenColumn(column.toThrift());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
index 22b36f604c..b59f974eda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
@@ -123,6 +123,8 @@ public class Function implements Writable {
     private URI location;
     private TFunctionBinaryType binaryType;
 
+    private Function nestedFunction = null;
+
     protected NullableMode nullableMode = NullableMode.DEPEND_ON_ARGUMENT;
 
     protected boolean vectorized = true;
@@ -192,6 +194,14 @@ public class Function implements Writable {
         this.checksum = other.checksum;
     }
 
+    public void setNestedFunction(Function nestedFunction) {
+        this.nestedFunction = nestedFunction;
+    }
+
+    public Function getNestedFunction() {
+        return nestedFunction;
+    }
+
     public Function clone() {
         return new Function(this);
     }
@@ -532,7 +542,8 @@ public class Function implements Writable {
         }
 
         if (realReturnType.isAggStateType()) {
-            realReturnType = new ScalarType(Arrays.asList(realArgTypes), Arrays.asList(realArgTypeNullables));
+            realReturnType = Expr.createAggStateType(((AggStateType) realReturnType), Arrays.asList(realArgTypes),
+                    Arrays.asList(realArgTypeNullables));
         }
 
         // For types with different precisions and scales, return type only indicates a
@@ -835,14 +846,30 @@ public class Function implements Writable {
     public static FunctionCallExpr convertToStateCombinator(FunctionCallExpr fnCall) {
         Function aggFunction = fnCall.getFn();
         List<Type> arguments = Arrays.asList(aggFunction.getArgs());
-        ScalarFunction fn = new org.apache.doris.catalog.ScalarFunction(
-                new FunctionName(aggFunction.getFunctionName().getFunction() + Expr.AGG_STATE_SUFFIX),
-                arguments,
-                new ScalarType(arguments, fnCall.getChildren().stream().map(expr -> {
-                    return expr.isNullable();
-                }).collect(Collectors.toList())), aggFunction.hasVarArgs(), aggFunction.isUserVisible());
+        ScalarFunction fn = new ScalarFunction(
+                new FunctionName(aggFunction.getFunctionName().getFunction() + Expr.AGG_STATE_SUFFIX), arguments,
+                Expr.createAggStateType(aggFunction.getFunctionName().getFunction(),
+                        fnCall.getChildren().stream().map(expr -> {
+                            return expr.getType();
+                        }).collect(Collectors.toList()), fnCall.getChildren().stream().map(expr -> {
+                            return expr.isNullable();
+                        }).collect(Collectors.toList())),
+                aggFunction.hasVarArgs(), aggFunction.isUserVisible());
         fn.setNullableMode(NullableMode.ALWAYS_NOT_NULLABLE);
         fn.setBinaryType(TFunctionBinaryType.AGG_STATE);
         return new FunctionCallExpr(fn, fnCall.getParams());
     }
+
+    public static FunctionCallExpr convertToMergeCombinator(FunctionCallExpr fnCall) {
+        Function aggFunction = fnCall.getFn();
+        aggFunction.setName(new FunctionName(aggFunction.getFunctionName().getFunction() + Expr.AGG_MERGE_SUFFIX));
+        aggFunction.setArgs(Arrays.asList(Expr.createAggStateType(aggFunction.getFunctionName().getFunction(),
+                fnCall.getChildren().stream().map(expr -> {
+                    return expr.getType();
+                }).collect(Collectors.toList()), fnCall.getChildren().stream().map(expr -> {
+                    return expr.isNullable();
+                }).collect(Collectors.toList()))));
+        aggFunction.setBinaryType(TFunctionBinaryType.AGG_STATE);
+        return fnCall;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
index 84d0d7ce06..ac245c3066 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/AggStateType.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.nereids.types;
 
-import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.nereids.types.coercion.AbstractDataType;
 import org.apache.doris.nereids.types.coercion.PrimitiveType;
@@ -30,22 +30,28 @@ import java.util.stream.Collectors;
  */
 public class AggStateType extends PrimitiveType {
 
-    public static final AggStateType SYSTEM_DEFAULT = new AggStateType(null, null);
+    public static final AggStateType SYSTEM_DEFAULT = new AggStateType(null, null, null);
 
     public static final int WIDTH = 16;
 
     private final List<DataType> subTypes;
     private final List<Boolean> subTypeNullables;
+    private final String functionName;
 
-    public AggStateType(List<DataType> subTypes, List<Boolean> subTypeNullables) {
+    public AggStateType(String functionName, List<DataType> subTypes, List<Boolean> subTypeNullables) {
         this.subTypes = subTypes;
         this.subTypeNullables = subTypeNullables;
+        this.functionName = functionName;
+    }
+
+    public List<DataType> getSubTypes() {
+        return subTypes;
     }
 
     @Override
     public Type toCatalogDataType() {
         List<Type> types = subTypes.stream().map(t -> t.toCatalogDataType()).collect(Collectors.toList());
-        return new ScalarType(types, subTypeNullables);
+        return Expr.createAggStateType(functionName, types, subTypeNullables);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
index 17cb0ed9a5..0941a4a1e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/types/DataType.java
@@ -325,9 +325,10 @@ public abstract class DataType implements AbstractDataType {
             org.apache.doris.catalog.ArrayType arrayType = (org.apache.doris.catalog.ArrayType) type;
             return ArrayType.of(fromCatalogType(arrayType.getItemType()), arrayType.getContainsNull());
         } else if (type.isAggStateType()) {
-            List<DataType> types = ((ScalarType) type).getSubTypes().stream().map(t -> fromCatalogType(t))
+            org.apache.doris.catalog.AggStateType catalogType = ((org.apache.doris.catalog.AggStateType) type);
+            List<DataType> types = catalogType.getSubTypes().stream().map(t -> fromCatalogType(t))
                     .collect(Collectors.toList());
-            return new AggStateType(types, ((ScalarType) type).getSubTypeNullables());
+            return new AggStateType(catalogType.getFunctionName(), types, catalogType.getSubTypeNullables());
         }
         throw new AnalysisException("Nereids do not support type: " + type);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 12623cfd31..bf7c0a5484 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -20,6 +20,7 @@ package org.apache.doris.persist.gson;
 import org.apache.doris.alter.AlterJobV2;
 import org.apache.doris.alter.RollupJobV2;
 import org.apache.doris.alter.SchemaChangeJobV2;
+import org.apache.doris.catalog.AggStateType;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.DistributionInfo;
@@ -140,7 +141,8 @@ public class GsonUtils {
             .registerSubtype(ScalarType.class, ScalarType.class.getSimpleName())
             .registerSubtype(ArrayType.class, ArrayType.class.getSimpleName())
             .registerSubtype(MapType.class, MapType.class.getSimpleName())
-            .registerSubtype(StructType.class, StructType.class.getSimpleName());
+            .registerSubtype(StructType.class, StructType.class.getSimpleName())
+            .registerSubtype(AggStateType.class, AggStateType.class.getSimpleName());
 
     // runtime adapter for class "DistributionInfo"
     private static RuntimeTypeAdapterFactory<DistributionInfo> distributionInfoTypeAdapterFactory
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index d57ed2e24f..e9ced52391 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -58,6 +58,8 @@ message PColumnMeta {
     optional bool is_nullable = 3 [default = false];
     optional Decimal decimal_param = 4;
     repeated PColumnMeta children = 5;
+    optional bool result_is_nullable = 6;
+    optional string function_name = 7;
 }
 
 message PBlock {
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 2d6a331c37..9bf2abe5c8 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -189,6 +189,7 @@ message ColumnPB {
     optional bool visible = 16 [default=true];
     repeated ColumnPB children_columns = 17;
     repeated string children_column_names = 18;
+    optional bool result_is_nullable = 19;
 }
 
 enum IndexType {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 284c728b8d..631132f934 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -38,6 +38,7 @@ struct TColumn {
     14: optional i32 gram_size
     15: optional i32 gram_bf_size
     16: optional string aggregation
+    17: optional bool result_is_nullable
 }
 
 struct TSlotDescriptor {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index b0b5da4550..ee62f59aa5 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -161,7 +161,9 @@ struct TTypeDesc {
     1: list<TTypeNode> types
     2: optional bool is_nullable
     3: optional i64  byte_size
-    4: optional list<TTypeDesc> sub_types;
+    4: optional list<TTypeDesc> sub_types
+    5: optional bool result_is_nullable
+    6: optional string function_name
 }
 
 enum TAggregationType {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org