You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2019/09/26 05:51:18 UTC

[incubator-doris] branch master updated: Avoid SerDe for aggregation query with object pool (#1854)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b246d93  Avoid SerDe for aggregation query with object pool (#1854)
b246d93 is described below

commit b246d931282a1236e10fb29e755a021774aa913b
Author: kangkaisen <ka...@apache.org>
AuthorDate: Thu Sep 26 13:51:13 2019 +0800

    Avoid SerDe for aggregation query with object pool (#1854)
---
 be/src/common/object_pool.h               |  7 ++++
 be/src/exec/exec_node.cpp                 | 19 +++++++++++
 be/src/exec/exec_node.h                   |  6 ++++
 be/src/exec/olap_scan_node.cpp            |  3 +-
 be/src/exec/olap_scan_node.h              |  6 +++-
 be/src/exec/olap_scanner.cpp              |  9 +++++-
 be/src/exec/olap_scanner.h                |  2 ++
 be/src/exprs/aggregate_functions.cpp      |  2 +-
 be/src/exprs/bitmap_function.cpp          | 19 ++++++++---
 be/src/exprs/hll_function.cpp             |  9 ++++--
 be/src/olap/aggregate_func.h              | 53 +++++++++++++++++--------------
 be/src/olap/field.h                       | 16 +++++-----
 be/src/olap/memtable.cpp                  |  2 +-
 be/src/olap/memtable.h                    |  2 ++
 be/src/olap/merger.cpp                    |  4 +--
 be/src/olap/reader.cpp                    | 17 ++++++----
 be/src/olap/reader.h                      | 15 +++++----
 be/src/olap/row.h                         |  4 +--
 be/src/olap/schema_change.cpp             |  4 ++-
 be/src/olap/task/engine_checksum_task.cpp |  4 ++-
 be/src/runtime/plan_fragment_executor.cpp |  2 ++
 be/src/runtime/row_batch.cpp              | 44 +++++++------------------
 be/src/runtime/row_batch.h                | 15 ++++-----
 be/src/udf/udf.cpp                        | 14 ++++++--
 be/src/udf/udf.h                          |  2 +-
 be/src/util/arena.h                       |  5 +++
 be/test/olap/aggregate_func_test.cpp      | 16 +++++++---
 be/test/olap/row_cursor_test.cpp          |  7 ++--
 28 files changed, 194 insertions(+), 114 deletions(-)

diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h
index 98fff35..70d52e8 100644
--- a/be/src/common/object_pool.h
+++ b/be/src/common/object_pool.h
@@ -56,6 +56,13 @@ public:
         _objects.clear();
     }
 
+    // Absorb all objects from src pool
+    // Note: This method is not thread safe
+    void acquire_data(ObjectPool* src) {
+        _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end());
+        src->_objects.clear();
+    }
+
 private:
     struct GenericElement {
         virtual ~GenericElement() {}
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 5facfa6..ab00596 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -530,6 +530,25 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
     collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
 }
 
+void ExecNode::try_do_aggregate_serde_improve() {
+    std::vector<ExecNode*> agg_node;
+    collect_nodes(TPlanNodeType::AGGREGATION_NODE, &agg_node);
+    if (agg_node.size() != 1) {
+        return;
+    }
+
+    if (agg_node[0]->_children.size() != 1) {
+        return;
+    }
+
+    if (agg_node[0]->_children[0]->type() != TPlanNodeType::OLAP_SCAN_NODE) {
+        return;
+    }
+
+    OlapScanNode* scan_node = static_cast<OlapScanNode*>(agg_node[0]->_children[0]);
+    scan_node->set_no_agg_finalize();
+}
+
 void ExecNode::init_runtime_profile(const std::string& name) {
     std::stringstream ss;
     ss << name << " (id=" << _id << ")";
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ace7935..e40a4d3 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -159,6 +159,12 @@ public:
     // Collect all scan node types.
     void collect_scan_nodes(std::vector<ExecNode*>* nodes);
 
+    // When the agg node is the scan node direct parent,
+    // we directly return agg object from scan node to agg node,
+    // and don't serialize the agg object.
+    // This improve is cautious, we ensure the correctness firstly.
+    void try_do_aggregate_serde_improve();
+
     typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
     // Evaluate exprs over row.  Returns true if all exprs return true.
     // TODO: This doesn't use the vector<Expr*> signature because I haven't figured
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 8f51551..4acddb4 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -150,7 +150,6 @@ Status OlapScanNode::prepare(RuntimeState* state) {
     _rows_pushed_cond_filtered_counter =
         ADD_COUNTER(_runtime_profile, "RowsPushedCondFiltered", TUnit::UNIT);
     _init_counter(state);
-
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     if (_tuple_desc == NULL) {
         // TODO: make sure we print all available diagnostic output to our error log
@@ -696,7 +695,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
                 scanner_ranges.push_back((*ranges)[i].get());
             }
             OlapScanner* scanner = new OlapScanner(
-                state, this, _olap_scan_node.is_preaggregation, *scan_range, scanner_ranges);
+                state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges);
             _scanner_pool->add(scanner);
             _olap_scanners.push_back(scanner);
         }
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 434e423..fa280b4 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -58,7 +58,9 @@ public:
     Status collect_query_statistics(QueryStatistics* statistics) override;
     virtual Status close(RuntimeState* state);
     virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges);
-
+    inline void set_no_agg_finalize() {
+        _need_agg_finalize = false;
+    }
 protected:
     typedef struct {
         Tuple* tuple;
@@ -242,6 +244,8 @@ private:
     int64_t _running_thread;
     EvalConjunctsFn _eval_conjuncts_fn;
 
+    bool _need_agg_finalize = true;
+
     // Counters
     RuntimeProfile::Counter* _io_timer = nullptr;
     RuntimeProfile::Counter* _read_compressed_counter = nullptr;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 008cd54..f4afb85 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -43,6 +43,7 @@ OlapScanner::OlapScanner(
         RuntimeState* runtime_state,
         OlapScanNode* parent,
         bool aggregation,
+        bool need_agg_finalize,
         const TPaloScanRange& scan_range,
         const std::vector<OlapScanRange*>& key_ranges)
             : _runtime_state(runtime_state),
@@ -52,6 +53,7 @@ OlapScanner::OlapScanner(
             _string_slots(parent->_string_slots),
             _is_open(false),
             _aggregation(aggregation),
+            _need_agg_finalize(need_agg_finalize),
             _tuple_idx(parent->_tuple_idx),
             _direct_conjunct_size(parent->_direct_conjunct_size) {
     _reader.reset(new Reader());
@@ -213,6 +215,11 @@ Status OlapScanner::_init_params(
     }
     _read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
 
+    // If a agg node is this scan node direct parent
+    // we will not call agg object finalize method in scan node,
+    // to avoid the unnecessary SerDe and improve query performance
+    _params.need_agg_finalize = _need_agg_finalize;
+
     return Status::OK();
 }
 
@@ -264,7 +271,7 @@ Status OlapScanner::get_batch(
                 break;
             }
             // Read one row from reader
-            auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), eof);
+            auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), batch->agg_object_pool(), eof);
             if (res != OLAP_SUCCESS) {
                 return Status::InternalError("Internal Error: read storage fail.");
             }
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 0708da6..b163518 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -53,6 +53,7 @@ public:
         RuntimeState* runtime_state,
         OlapScanNode* parent,
         bool aggregation,
+        bool need_agg_finalize,
         const TPaloScanRange& scan_range,
         const std::vector<OlapScanRange*>& key_ranges);
 
@@ -107,6 +108,7 @@ private:
     int _id;
     bool _is_open;
     bool _aggregation;
+    bool _need_agg_finalize = true;
     bool _has_update_counter = false;
 
     Status _ctor_status;
diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
index 0c363d6..0a513f3 100644
--- a/be/src/exprs/aggregate_functions.cpp
+++ b/be/src/exprs/aggregate_functions.cpp
@@ -1191,7 +1191,7 @@ void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx,
     }
     DCHECK(!dst->is_null);
 
-    dst->agg_parse_and_cal(src);
+    dst->agg_parse_and_cal(ctx, src);
     return ;
 }
 
diff --git a/be/src/exprs/bitmap_function.cpp b/be/src/exprs/bitmap_function.cpp
index 5a42e79..6a45fc5 100644
--- a/be/src/exprs/bitmap_function.cpp
+++ b/be/src/exprs/bitmap_function.cpp
@@ -48,15 +48,24 @@ BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal
 }
 
 void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
-    RoaringBitmap src_bitmap = RoaringBitmap((char*)src.ptr);
     auto* dst_bitmap = reinterpret_cast<RoaringBitmap*>(dst->ptr);
-    dst_bitmap->merge(src_bitmap);
+    // zero size means the src input is a agg object
+    if (src.len == 0) {
+        dst_bitmap->merge(*reinterpret_cast<RoaringBitmap*>(src.ptr));
+    } else {
+        dst_bitmap->merge(RoaringBitmap((char*)src.ptr));
+    }
 }
 
 BigIntVal BitmapFunctions::bitmap_count(FunctionContext* ctx, const StringVal& src) {
-    RoaringBitmap bitmap ((char*)src.ptr);
-    BigIntVal result(bitmap.cardinality());
-    return result;
+    // zero size means the src input is a agg object
+    if (src.len == 0) {
+        auto bitmap = reinterpret_cast<RoaringBitmap*>(src.ptr);
+        return {bitmap->cardinality()};
+    } else {
+        RoaringBitmap bitmap ((char*)src.ptr);
+        return {bitmap.cardinality()};
+    }
 }
 
 StringVal BitmapFunctions::to_bitmap(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src) {
diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp
index e91f947..246f609 100644
--- a/be/src/exprs/hll_function.cpp
+++ b/be/src/exprs/hll_function.cpp
@@ -64,10 +64,15 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) {
         dst_hll->update(hash_value);
     }
 }
+
 void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) {
-    HyperLogLog src_hll((uint8_t*)src.ptr);
     auto* dst_hll = reinterpret_cast<HyperLogLog*>(dst->ptr);
-    dst_hll->merge(src_hll);
+    // zero size means the src input is a agg object
+    if (src.len == 0) {
+        dst_hll->merge(*reinterpret_cast<HyperLogLog*>(src.ptr));
+    } else {
+        dst_hll->merge(HyperLogLog(src.ptr));
+    }
 }
 
 BigIntVal HllFunctions::hll_finalize(FunctionContext*, const StringVal &src) {
diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h
index 8381dca..b3155cf 100644
--- a/be/src/olap/aggregate_func.h
+++ b/be/src/olap/aggregate_func.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "common/object_pool.h"
 #include "olap/hll.h"
 #include "olap/types.h"
 #include "olap/row_cursor_cell.h"
@@ -28,7 +29,7 @@
 
 namespace doris {
 
-using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena);
+using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool);
 using AggUpdateFunc = void (*)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena);
 using AggFinalizeFunc = void (*)(RowCursorCell* src, Arena* arena);
 
@@ -43,8 +44,8 @@ public:
     // Memory Note: For plain memory can be allocated from arena, whose lifetime
     // will last util finalize function is called. Memory allocated from heap should
     // be freed in finalize functioin to avoid memory leak.
-    inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const {
-        _init_fn(dst, src, src_null, arena);
+    inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const {
+        _init_fn(dst, src, src_null, arena, agg_pool);
     }
 
     // Update aggregated intermediate data. Data stored in engine is aggregated.
@@ -73,7 +74,7 @@ public:
     FieldAggregationMethod agg_method() const { return _agg_method; }
 
 private:
-    void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena);
+    void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool);
     void (*_update_fn)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena);
     void (*_finalize_fn)(RowCursorCell* src, Arena* arena);
 
@@ -87,7 +88,7 @@ private:
 
 template<FieldType field_type>
 struct BaseAggregateFuncs {
-    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
         dst->set_is_null(src_null);
         if (src_null) {
             return;
@@ -113,7 +114,7 @@ struct AggregateFuncTraits : public BaseAggregateFuncs<field_type> {
 template <>
 struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DECIMAL> :
         public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL>  {
-    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
         dst->set_is_null(src_null);
         if (src_null) {
             return;
@@ -129,7 +130,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DECIMAL>
 template <>
 struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATETIME> :
         public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
-    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
         dst->set_is_null(src_null);
         if (src_null) {
             return;
@@ -144,7 +145,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATETIME
 template <>
 struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATE> :
         public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
-    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
         dst->set_is_null(src_null);
         if (src_null) {
             return;
@@ -398,17 +399,21 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_REPLACE, OLAP_FIELD_TYPE_CHAR>
 // so when init, update hll, the src is not null
 template <>
 struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL> {
-    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
         DCHECK_EQ(src_null, false);
         dst->set_not_null();
 
         auto* src_slice = reinterpret_cast<const Slice*>(src);
         auto* dst_slice = reinterpret_cast<Slice*>(dst->mutable_cell_ptr());
 
-        dst_slice->size = sizeof(HyperLogLog);
-        // use 'placement new' to allocate HyperLogLog on arena, so that we can control the memory usage.
-        char* mem = arena->Allocate(dst_slice->size);
-        dst_slice->data = (char*) new (mem) HyperLogLog((const uint8_t*)src_slice->data);
+        // we use zero size represent this slice is a agg object
+        dst_slice->size = 0;
+        auto* hll = new HyperLogLog((const uint8_t*) src_slice->data);
+        dst_slice->data = reinterpret_cast<char*>(hll);
+
+        arena->track_memory(sizeof(HyperLogLog));
+
+        agg_pool->add(hll);
     }
 
     static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) {
@@ -425,33 +430,36 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
         } else {   // for stream load
             auto* src_hll = reinterpret_cast<HyperLogLog*>(src_slice->data);
             dst_hll->merge(*src_hll);
-            // NOT use 'delete src_hll' because the memory is managed by arena
-            src_hll->~HyperLogLog();
         }
     }
 
+    // The HLL object memory will be released by ObjectPool
     static void finalize(RowCursorCell* src, Arena* arena) {
         auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
         auto *hll = reinterpret_cast<HyperLogLog*>(slice->data);
 
         slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN);
         slice->size = hll->serialize((uint8_t*)slice->data);
-        // NOT using 'delete hll' because the memory is managed by arena
-        hll->~HyperLogLog();
     }
 };
 // when data load, after bitmap_init fucntion, bitmap_union column won't be null
 // so when init, update bitmap, the src is not null
 template <>
 struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_VARCHAR> {
-    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+    static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
         DCHECK_EQ(src_null, false);
         dst->set_not_null();
         auto* src_slice = reinterpret_cast<const Slice*>(src);
         auto* dst_slice = reinterpret_cast<Slice*>(dst->mutable_cell_ptr());
 
-        dst_slice->size = sizeof(RoaringBitmap);
-        dst_slice->data = (char*)new RoaringBitmap(src_slice->data);
+        // we use zero size represent this slice is a agg object
+        dst_slice->size = 0;
+        auto* bitmap = new RoaringBitmap(src_slice->data);
+        dst_slice->data = (char*) bitmap;
+
+        arena->track_memory(sizeof(RoaringBitmap));
+
+        agg_pool->add(bitmap);
     }
 
     static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) {
@@ -468,11 +476,10 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_
         } else {   // for stream load
             auto* src_bitmap = reinterpret_cast<RoaringBitmap*>(src_slice->data);
             dst_bitmap->merge(*src_bitmap);
-
-            delete src_bitmap;
         }
     }
 
+    // The RoaringBitmap object memory will be released by ObjectPool
     static void finalize(RowCursorCell* src, Arena *arena) {
         auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
         auto *bitmap = reinterpret_cast<RoaringBitmap*>(slice->data);
@@ -480,8 +487,6 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_
         slice->size = bitmap->size();
         slice->data = arena->Allocate(slice->size);
         bitmap->serialize(slice->data);
-
-        delete bitmap;
     }
 };
 
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 32ee098..caecdd9 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -66,8 +66,8 @@ public:
         _agg_info->finalize(dst, arena);
     }
 
-    virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const {
-        _agg_info->init(dst, src, src_null, arena);
+    virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const {
+        _agg_info->init(dst, src, src_null, arena, agg_pool);
     }
 
     // todo(kks): Unify AggregateInfo::init method and Field::agg_init method
@@ -76,7 +76,7 @@ public:
     // This functionn differs copy functionn in that if this filed
     // contain aggregate information, this functionn will initialize
     // destination in aggregate format, and update with srouce content.
-    virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const {
+    virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const {
         direct_copy(dst, src);
     }
 
@@ -344,7 +344,7 @@ public:
     }
 
     // the char field is especial, which need the _length info when consume raw data
-    void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const override {
+    void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const override {
         dst->set_is_null(src_null);
         if (src_null) {
             return;
@@ -403,8 +403,8 @@ public:
     }
 
     // bitmap storage data always not null
-    void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override {
-        _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena);
+    void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const override {
+        _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena, agg_pool);
     }
 
     char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
@@ -424,8 +424,8 @@ public:
     }
 
     // Hll storage data always not null
-    void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override {
-        _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena);
+    void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const override {
+        _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena, agg_pool);
     }
 
     char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 5a1b240..ae7e82b 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -68,7 +68,7 @@ void MemTable::insert(Tuple* tuple) {
 
         bool is_null = tuple->is_null(slot->null_indicator_offset());
         void* value = tuple->get_slot(slot->tuple_offset());
-        _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena());
+        _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena(), &_agg_object_pool);
     }
 
     bool overwritten = false;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 80be744..3856c83 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H
 #define DORIS_BE_SRC_OLAP_MEMTABLE_H
 
+#include "common/object_pool.h"
 #include "olap/schema.h"
 #include "olap/skiplist.h"
 #include "runtime/tuple.h"
@@ -57,6 +58,7 @@ private:
 
     RowCursorComparator _row_comparator;
     Arena _arena;
+    ObjectPool _agg_object_pool;
 
     typedef SkipList<char*, RowCursorComparator> Table;
     char* _tuple_buf;
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 3141738..ae1867e 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -44,14 +44,14 @@ OLAPStatus Merger::merge_rowsets(TabletSharedPtr tablet,
     RETURN_NOT_OK_LOG(row_cursor.init(tablet->tablet_schema()),
                  "failed to init row cursor when merging rowsets of tablet " + tablet->full_name());
     row_cursor.allocate_memory_for_string_type(tablet->tablet_schema());
-
     // The following procedure would last for long time, half of one day, etc.
     int64_t output_rows = 0;
     while (true) {
         Arena arena;
+        ObjectPool objectPool;
         bool eof = false;
         // Read one row into row_cursor
-        RETURN_NOT_OK_LOG(reader.next_row_with_aggregation(&row_cursor, &arena, &eof),
+        RETURN_NOT_OK_LOG(reader.next_row_with_aggregation(&row_cursor, &arena, &objectPool, &eof),
                           "failed to read next row when merging rowsets of tablet " + tablet->full_name());
         if (eof) {
             break;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 2151c12..9255faa 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -345,7 +345,7 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
     return OLAP_SUCCESS;
 }
 
-OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) {
+OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) {
     if (UNLIKELY(_next_key == nullptr)) {
         *eof = true;
         return OLAP_SUCCESS;
@@ -360,12 +360,12 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool*
     return OLAP_SUCCESS;
 }
 
-OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) {
+OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) {
     if (UNLIKELY(_next_key == nullptr)) {
         *eof = true;
         return OLAP_SUCCESS;
     }
-    init_row_with_others(row_cursor, *_next_key, arena);
+    init_row_with_others(row_cursor, *_next_key, arena, agg_pool);
     int64_t merged_count = 0;
     do {
         auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
@@ -389,11 +389,15 @@ OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool*
         ++merged_count;
     } while (true);
     _merged_rows += merged_count;
-    agg_finalize_row(_value_cids, row_cursor, arena);
+    // For agg query, we don't need finalize agg object and directly pass agg object to agg node
+    if (_need_agg_finalize) {
+        agg_finalize_row(_value_cids, row_cursor, arena);
+    }
+
     return OLAP_SUCCESS;
 }
 
-OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) {
+OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) {
     *eof = false;
     bool cur_delete_flag = false;
     do {
@@ -403,7 +407,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, boo
         }
     
         cur_delete_flag = _next_delete_flag;
-        init_row_with_others(row_cursor, *_next_key, arena);
+        init_row_with_others(row_cursor, *_next_key, arena, agg_pool);
 
         int64_t merged_count = 0;
         while (NULL != _next_key) {
@@ -564,6 +568,7 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
     read_params.check_validation();
     OLAPStatus res = OLAP_SUCCESS;
     _aggregation = read_params.aggregation;
+    _need_agg_finalize = read_params.need_agg_finalize;
     _reader_type = read_params.reader_type;
     _tablet = read_params.tablet;
     _version = read_params.version;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 48f3be4..2183cae 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -53,6 +53,7 @@ struct ReaderParams {
     TabletSharedPtr tablet;
     ReaderType reader_type;
     bool aggregation;
+    bool need_agg_finalize = true;
     Version version;
     // possible values are "gt", "ge", "eq"
     std::string range;
@@ -125,8 +126,8 @@ public:
     // Return OLAP_SUCCESS and set `*eof` to false when next row is read into `row_cursor`.
     // Return OLAP_SUCCESS and set `*eof` to true when no more rows can be read.
     // Return others when unexpected error happens.
-    OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, bool *eof) {
-        return (this->*_next_row_func)(row_cursor, arena, eof);
+    OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, ObjectPool* agg_pool, bool *eof) {
+        return (this->*_next_row_func)(row_cursor, arena, agg_pool, eof);
     }
 
     uint64_t merged_rows() const {
@@ -200,9 +201,9 @@ private:
 
     OLAPStatus _init_load_bf_columns(const ReaderParams& read_params);
 
-    OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof);
-    OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof);
-    OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof);
+    OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof);
+    OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof);
+    OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof);
 
     TabletSharedPtr tablet() { return _tablet; }
 
@@ -233,9 +234,11 @@ private:
 
     DeleteHandler _delete_handler;
 
-    OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, bool* eof) = nullptr;
+    OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) = nullptr;
 
     bool _aggregation;
+    // for agg query, we don't need to finalize when scan agg object data
+    bool _need_agg_finalize = true;
     bool _version_locked;
     ReaderType _reader_type;
     bool _next_delete_flag;
diff --git a/be/src/olap/row.h b/be/src/olap/row.h
index 1dd0581..64ea139 100644
--- a/be/src/olap/row.h
+++ b/be/src/olap/row.h
@@ -107,10 +107,10 @@ int index_compare_row(const LhsRowType& lhs, const RhsRowType& rhs) {
 // function will first initialize destination column and then update with source column
 // value.
 template<typename DstRowType, typename SrcRowType>
-void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena) {
+void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena, ObjectPool* agg_pool) {
     for (auto cid : dst->schema()->column_ids()) {
         auto dst_cell = dst->cell(cid);
-        dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena);
+        dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena, agg_pool);
     }
 }
 
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 9bf5bb0..b7eed21 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -569,6 +569,7 @@ bool RowBlockMerger::merge(
     uint64_t tmp_merged_rows = 0;
     RowCursor row_cursor;
     std::unique_ptr<Arena> arena(new Arena());
+    std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
     if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) {
         LOG(WARNING) << "fail to init row cursor.";
         goto MERGE_ERR;
@@ -578,7 +579,7 @@ bool RowBlockMerger::merge(
 
     row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
     while (_heap.size() > 0) {
-        init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get());
+        init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get(), agg_object_pool.get());
 
         if (!_pop_heap()) {
             goto MERGE_ERR;
@@ -604,6 +605,7 @@ bool RowBlockMerger::merge(
         // the memory allocate by arena has been copied,
         // so we should release these memory immediately
         arena.reset(new Arena());
+        agg_object_pool.reset(new ObjectPool());
     }
     if (rowset_writer->flush() != OLAP_SUCCESS) {
         LOG(WARNING) << "failed to finalizing writer.";
diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp
index 43b5ad3..99d2333 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -105,6 +105,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
 
     RowCursor row;
     std::unique_ptr<Arena> arena(new Arena());
+    std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
     res = row.init(tablet->tablet_schema(), reader_params.return_columns);
     if (res != OLAP_SUCCESS) {
         OLAP_LOG_WARNING("failed to init row cursor. [res=%d]", res);
@@ -115,7 +116,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
     bool eof = false;
     uint32_t row_checksum = 0;
     while (true) {
-        OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), &eof);
+        OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), agg_object_pool.get(), &eof);
         if (res == OLAP_SUCCESS && eof) {
             VLOG(3) << "reader reads to the end.";
             break;
@@ -128,6 +129,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
         // the memory allocate by arena has been copied,
         // so we should release these memory immediately
         arena.reset(new Arena());
+        agg_object_pool.reset(new ObjectPool());
     }
 
     LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 729542e..4a9eb0e 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -173,6 +173,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
     VLOG(1) << "scan_nodes.size()=" << scan_nodes.size();
     VLOG(1) << "params.per_node_scan_ranges.size()=" << params.per_node_scan_ranges.size();
 
+    _plan->try_do_aggregate_serde_improve();
+
     for (int i = 0; i < scan_nodes.size(); ++i) {
         ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
         const std::vector<TScanRangeParams>& scan_ranges =
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 6e31a44..19152a2 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -48,7 +48,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
         _row_desc(row_desc),
         _auxiliary_mem_usage(0),
         _need_to_return(false),
-        _tuple_data_pool(new MemPool(_mem_tracker)) {
+        _tuple_data_pool(new MemPool(_mem_tracker)),
+        _agg_object_pool(new ObjectPool()) {
     DCHECK(_mem_tracker != NULL);
     DCHECK_GT(capacity, 0);
     _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
@@ -82,7 +83,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc,
             _row_desc(row_desc),
             _auxiliary_mem_usage(0),
             _need_to_return(false),
-            _tuple_data_pool(new MemPool(_mem_tracker)) {
+            _tuple_data_pool(new MemPool(_mem_tracker)),
+            _agg_object_pool(new ObjectPool()) {
     DCHECK(_mem_tracker != nullptr);
     _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
@@ -173,7 +175,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
         _row_desc(row_desc),
         _auxiliary_mem_usage(0),
         _need_to_return(false),
-        _tuple_data_pool(new MemPool(_mem_tracker)) {
+        _tuple_data_pool(new MemPool(_mem_tracker)),
+        _agg_object_pool(new ObjectPool()) {
     DCHECK(_mem_tracker != NULL);
     _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
     DCHECK_GT(_tuple_ptrs_size, 0);
@@ -259,6 +262,7 @@ void RowBatch::clear() {
     }
 
     _tuple_data_pool->free_all();
+    _agg_object_pool.reset(new ObjectPool());
     for (int i = 0; i < _io_buffers.size(); ++i) {
         _io_buffers[i]->return_buffer();
     }
@@ -464,6 +468,7 @@ void RowBatch::reset() {
     
     // TODO: Change this to Clear() and investigate the repercussions.
     _tuple_data_pool->free_all();
+    _agg_object_pool.reset(new ObjectPool());
     for (int i = 0; i < _io_buffers.size(); ++i) {
         _io_buffers[i]->return_buffer();
     }
@@ -500,35 +505,30 @@ void RowBatch::close_tuple_streams() {
 void RowBatch::transfer_resource_ownership(RowBatch* dest) {
     dest->_auxiliary_mem_usage += _tuple_data_pool->total_allocated_bytes();
     dest->_tuple_data_pool->acquire_data(_tuple_data_pool.get(), false);
+    dest->_agg_object_pool->acquire_data(_agg_object_pool.get());
     for (int i = 0; i < _io_buffers.size(); ++i) {
         DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
         dest->_io_buffers.push_back(buffer);
         dest->_auxiliary_mem_usage += buffer->buffer_len();
         buffer->set_mem_tracker(dest->_mem_tracker);
     }
-    _io_buffers.clear();
 
     for (BufferInfo& buffer_info : _buffers) {
         dest->add_buffer(
             buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
     }
-    _buffers.clear();
 
     for (int i = 0; i < _tuple_streams.size(); ++i) {
         dest->_tuple_streams.push_back(_tuple_streams[i]);
         dest->_auxiliary_mem_usage += _tuple_streams[i]->byte_size();
     }
-    _tuple_streams.clear();
+
     for (int i = 0; i < _blocks.size(); ++i) {
         dest->_blocks.push_back(_blocks[i]);
         dest->_auxiliary_mem_usage += _blocks[i]->buffer_len();
     }
-    _blocks.clear();
+
     dest->_need_to_return |= _need_to_return;
-    _auxiliary_mem_usage = 0;
-    if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
-        _tuple_ptrs = NULL;
-    }
 
     if (_needs_deep_copy) {
       dest->mark_needs_deep_copy();
@@ -589,28 +589,6 @@ void RowBatch::acquire_state(RowBatch* src) {
     src->transfer_resource_ownership(this);
 }
 
-void RowBatch::swap(RowBatch* other) {
-    DCHECK(_row_desc.equals(other->_row_desc));
-    DCHECK_EQ(_num_tuples_per_row, other->_num_tuples_per_row);
-    DCHECK_EQ(_tuple_ptrs_size, other->_tuple_ptrs_size);
-
-    // The destination row batch should be empty.
-    DCHECK(!_has_in_flight_row);
-    DCHECK_EQ(_tuple_data_pool->total_reserved_bytes(), 0);
-
-    std::swap(_has_in_flight_row, other->_has_in_flight_row);
-    std::swap(_num_rows, other->_num_rows);
-    std::swap(_capacity, other->_capacity);
-    if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
-        // Tuple pointers are allocated from tuple_data_pool_ so are transferred.
-        _tuple_ptrs = other->_tuple_ptrs;
-        other->_tuple_ptrs = NULL;
-    } else {
-        // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
-        std::swap(_tuple_ptrs, other->_tuple_ptrs);
-    }
-}
-
 // TODO: consider computing size of batches as they are built up
 int RowBatch::total_byte_size() {
     int result = 0;
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 531c6ec..7aaf2a3 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -245,6 +245,9 @@ public:
     MemPool* tuple_data_pool() {
         return _tuple_data_pool.get();
     }
+    ObjectPool* agg_object_pool() {
+        return _agg_object_pool.get();
+    }
     int num_io_buffers() const {
         return _io_buffers.size();
     }
@@ -322,6 +325,7 @@ public:
 
     // Transfer ownership of resources to dest.  This includes tuple data in mem
     // pool and io buffers.
+    // we firstly update dest resource, and then reset current resource
     void transfer_resource_ownership(RowBatch* dest);
 
     void copy_row(TupleRow* src, TupleRow* dest) {
@@ -384,14 +388,6 @@ public:
     int num_buffers() const { 
         return _buffers.size(); 
     }
-    // Swaps all of the row batch state with 'other'.  This is used for scan nodes
-    // which produce RowBatches asynchronously.  Typically, an ExecNode is handed
-    // a row batch to populate (pull model) but ScanNodes have multiple threads
-    // which push row batches.  This function is used to swap the pushed row batch
-    // contents with the row batch that's passed from the caller.
-    // TODO: this is wasteful and makes a copy that's unnecessary.  Think about cleaning
-    // this up.
-    void swap(RowBatch* other);
 
     const RowDescriptor& row_desc() const {
         return _row_desc;
@@ -484,6 +480,9 @@ private:
     // holding (some of the) data referenced by rows
     boost::scoped_ptr<MemPool> _tuple_data_pool;
 
+    // holding some complex agg object data (bitmap, hll)
+    std::unique_ptr<ObjectPool> _agg_object_pool;
+
     // IO buffers current owned by this row batch. Ownership of IO buffers transfer
     // between row batches. Any IO buffer will be owned by at most one row batch
     // (i.e. they are not ref counted) so most row batches don't own any.
diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp
index 4b49c3c..aba14cd 100755
--- a/be/src/udf/udf.cpp
+++ b/be/src/udf/udf.cpp
@@ -465,9 +465,19 @@ void HllVal::init(FunctionContext* ctx) {
     is_null = false;
 }
 
-void HllVal::agg_parse_and_cal(const HllVal &other) {
+void HllVal::agg_parse_and_cal(FunctionContext* ctx, const HllVal& other) {
     doris::HllSetResolver resolver;
-    resolver.init((char*)other.ptr, other.len);
+
+    // zero size means the src input is a HyperLogLog object
+    if (other.len == 0) {
+        auto* hll = reinterpret_cast<doris::HyperLogLog*>(other.ptr);
+        uint8_t* other_ptr = ctx->allocate(doris::HLL_COLUMN_DEFAULT_LEN);
+        int other_len = hll->serialize(ptr);
+        resolver.init((char*)other_ptr, other_len);
+    } else {
+        resolver.init((char*)other.ptr, other.len);
+    }
+
     resolver.parse();
 
     if (resolver.get_hll_data_type() == doris::HLL_DATA_EMPTY) {
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 192de66..4cf5d9b 100755
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -777,7 +777,7 @@ struct HllVal : public StringVal {
 
     void init(FunctionContext* ctx);
 
-    void agg_parse_and_cal(const HllVal &other);
+    void agg_parse_and_cal(FunctionContext* ctx, const HllVal& other);
 
     void agg_merge(const HllVal &other);
 };
diff --git a/be/src/util/arena.h b/be/src/util/arena.h
index 4a260e9..ef042eb 100644
--- a/be/src/util/arena.h
+++ b/be/src/util/arena.h
@@ -34,6 +34,11 @@ public:
         return memory_usage_.load(std::memory_order_relaxed);
     }
 
+    // For the object wasn't allocated from Arena, but need to
+    // collect and control the object memory usage.
+    void track_memory(size_t bytes) {
+        memory_usage_.store(MemoryUsage() + bytes,std::memory_order_relaxed);
+    }
 private:
     char* AllocateFallback(size_t bytes);
     char* AllocateNewBlock(size_t block_bytes);
diff --git a/be/test/olap/aggregate_func_test.cpp b/be/test/olap/aggregate_func_test.cpp
index 7d4ca56..21f2cc6 100644
--- a/be/test/olap/aggregate_func_test.cpp
+++ b/be/test/olap/aggregate_func_test.cpp
@@ -19,6 +19,7 @@
 
 #include <gtest/gtest.h>
 
+#include "common/object_pool.h"
 #include "olap/decimal12.h"
 #include "olap/uint24.h"
 
@@ -37,6 +38,7 @@ void test_min() {
     char buf[64];
 
     Arena arena;
+    ObjectPool agg_object_pool;
     const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MIN, field_type);
 
     RowCursorCell dst(buf);
@@ -44,7 +46,7 @@ void test_min() {
     {
         char val_buf[16];
         *(bool*)val_buf = true;
-        agg->init(&dst, val_buf, true, &arena);
+        agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
         ASSERT_TRUE(*(bool*)(buf));
     }
     // 100
@@ -110,6 +112,7 @@ void test_max() {
     char buf[64];
 
     Arena arena;
+    ObjectPool agg_object_pool;
     const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MAX, field_type);
 
     RowCursorCell dst(buf);
@@ -117,7 +120,7 @@ void test_max() {
     {
         char val_buf[16];
         *(bool*)val_buf = true;
-        agg->init(&dst, val_buf, true, &arena);
+        agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
         ASSERT_TRUE(*(bool*)(buf));
     }
     // 100
@@ -183,13 +186,14 @@ void test_sum() {
     RowCursorCell dst(buf);
 
     Arena arena;
+    ObjectPool agg_object_pool;
     const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_SUM, field_type);
 
     // null
     {
         char val_buf[16];
         *(bool*)val_buf = true;
-        agg->init(&dst, val_buf, true, &arena);
+        agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
         ASSERT_TRUE(*(bool*)(buf));
     }
     // 100
@@ -255,13 +259,14 @@ void test_replace() {
     RowCursorCell dst(buf);
 
     Arena arena;
+    ObjectPool agg_object_pool;
     const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type);
 
     // null
     {
         char val_buf[16];
         *(bool*)val_buf = true;
-        agg->init(&dst, val_buf, true, &arena);
+        agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
         ASSERT_TRUE(*(bool*)(buf));
     }
     // 100
@@ -312,6 +317,7 @@ void test_replace_string() {
     dst_slice->size = 0;
 
     Arena arena;
+    ObjectPool agg_object_pool;
     const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type);
 
     char src[string_field_size];
@@ -320,7 +326,7 @@ void test_replace_string() {
     // null
     {
         src_cell.set_null();
-        agg->init(&dst_cell, (const char*)src_slice, true, &arena);
+        agg->init(&dst_cell, (const char*)src_slice, true, &arena, &agg_object_pool);
         ASSERT_TRUE(dst_cell.is_null());
     }
     // "12345"
diff --git a/be/test/olap/row_cursor_test.cpp b/be/test/olap/row_cursor_test.cpp
index ab538b4..cd26618 100644
--- a/be/test/olap/row_cursor_test.cpp
+++ b/be/test/olap/row_cursor_test.cpp
@@ -17,6 +17,7 @@
 
 #include <gtest/gtest.h>
 
+#include "common/object_pool.h"
 #include "olap/row_cursor.h"
 #include "olap/tablet_schema.h"
 #include "olap/row.h"
@@ -470,7 +471,8 @@ TEST_F(TestRowCursor, AggregateWithoutNull) {
     left.set_field_content(4, reinterpret_cast<char*>(&l_decimal), _mem_pool.get());
     left.set_field_content(5, reinterpret_cast<char*>(&l_varchar), _mem_pool.get());
 
-    init_row_with_others(&row, left, arena.get());
+    ObjectPool agg_object_pool;
+    init_row_with_others(&row, left, arena.get(), &agg_object_pool);
 
     RowCursor right;
     res = right.init(tablet_schema);
@@ -528,7 +530,8 @@ TEST_F(TestRowCursor, AggregateWithNull) {
     left.set_null(4);
     left.set_field_content(5, reinterpret_cast<char*>(&l_varchar), _mem_pool.get());
 
-    init_row_with_others(&row, left, arena.get());
+    ObjectPool agg_object_pool;
+    init_row_with_others(&row, left, arena.get(), &agg_object_pool);
 
     RowCursor right;
     res = right.init(tablet_schema);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org