You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2019/09/26 05:51:18 UTC
[incubator-doris] branch master updated: Avoid SerDe for
aggregation query with object pool (#1854)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b246d93 Avoid SerDe for aggregation query with object pool (#1854)
b246d93 is described below
commit b246d931282a1236e10fb29e755a021774aa913b
Author: kangkaisen <ka...@apache.org>
AuthorDate: Thu Sep 26 13:51:13 2019 +0800
Avoid SerDe for aggregation query with object pool (#1854)
---
be/src/common/object_pool.h | 7 ++++
be/src/exec/exec_node.cpp | 19 +++++++++++
be/src/exec/exec_node.h | 6 ++++
be/src/exec/olap_scan_node.cpp | 3 +-
be/src/exec/olap_scan_node.h | 6 +++-
be/src/exec/olap_scanner.cpp | 9 +++++-
be/src/exec/olap_scanner.h | 2 ++
be/src/exprs/aggregate_functions.cpp | 2 +-
be/src/exprs/bitmap_function.cpp | 19 ++++++++---
be/src/exprs/hll_function.cpp | 9 ++++--
be/src/olap/aggregate_func.h | 53 +++++++++++++++++--------------
be/src/olap/field.h | 16 +++++-----
be/src/olap/memtable.cpp | 2 +-
be/src/olap/memtable.h | 2 ++
be/src/olap/merger.cpp | 4 +--
be/src/olap/reader.cpp | 17 ++++++----
be/src/olap/reader.h | 15 +++++----
be/src/olap/row.h | 4 +--
be/src/olap/schema_change.cpp | 4 ++-
be/src/olap/task/engine_checksum_task.cpp | 4 ++-
be/src/runtime/plan_fragment_executor.cpp | 2 ++
be/src/runtime/row_batch.cpp | 44 +++++++------------------
be/src/runtime/row_batch.h | 15 ++++-----
be/src/udf/udf.cpp | 14 ++++++--
be/src/udf/udf.h | 2 +-
be/src/util/arena.h | 5 +++
be/test/olap/aggregate_func_test.cpp | 16 +++++++---
be/test/olap/row_cursor_test.cpp | 7 ++--
28 files changed, 194 insertions(+), 114 deletions(-)
diff --git a/be/src/common/object_pool.h b/be/src/common/object_pool.h
index 98fff35..70d52e8 100644
--- a/be/src/common/object_pool.h
+++ b/be/src/common/object_pool.h
@@ -56,6 +56,13 @@ public:
_objects.clear();
}
+ // Absorb all objects from src pool
+ // Note: This method is not thread safe
+ void acquire_data(ObjectPool* src) {
+ _objects.insert(_objects.end(), src->_objects.begin(), src->_objects.end());
+ src->_objects.clear();
+ }
+
private:
struct GenericElement {
virtual ~GenericElement() {}
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 5facfa6..ab00596 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -530,6 +530,25 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
}
+void ExecNode::try_do_aggregate_serde_improve() {
+ std::vector<ExecNode*> agg_node;
+ collect_nodes(TPlanNodeType::AGGREGATION_NODE, &agg_node);
+ if (agg_node.size() != 1) {
+ return;
+ }
+
+ if (agg_node[0]->_children.size() != 1) {
+ return;
+ }
+
+ if (agg_node[0]->_children[0]->type() != TPlanNodeType::OLAP_SCAN_NODE) {
+ return;
+ }
+
+ OlapScanNode* scan_node = static_cast<OlapScanNode*>(agg_node[0]->_children[0]);
+ scan_node->set_no_agg_finalize();
+}
+
void ExecNode::init_runtime_profile(const std::string& name) {
std::stringstream ss;
ss << name << " (id=" << _id << ")";
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ace7935..e40a4d3 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -159,6 +159,12 @@ public:
// Collect all scan node types.
void collect_scan_nodes(std::vector<ExecNode*>* nodes);
+ // When the agg node is the scan node direct parent,
+ // we directly return agg object from scan node to agg node,
+ // and don't serialize the agg object.
+ // This improve is cautious, we ensure the correctness firstly.
+ void try_do_aggregate_serde_improve();
+
typedef bool (*EvalConjunctsFn)(ExprContext* const* ctxs, int num_ctxs, TupleRow* row);
// Evaluate exprs over row. Returns true if all exprs return true.
// TODO: This doesn't use the vector<Expr*> signature because I haven't figured
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 8f51551..4acddb4 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -150,7 +150,6 @@ Status OlapScanNode::prepare(RuntimeState* state) {
_rows_pushed_cond_filtered_counter =
ADD_COUNTER(_runtime_profile, "RowsPushedCondFiltered", TUnit::UNIT);
_init_counter(state);
-
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == NULL) {
// TODO: make sure we print all available diagnostic output to our error log
@@ -696,7 +695,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
scanner_ranges.push_back((*ranges)[i].get());
}
OlapScanner* scanner = new OlapScanner(
- state, this, _olap_scan_node.is_preaggregation, *scan_range, scanner_ranges);
+ state, this, _olap_scan_node.is_preaggregation, _need_agg_finalize, *scan_range, scanner_ranges);
_scanner_pool->add(scanner);
_olap_scanners.push_back(scanner);
}
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 434e423..fa280b4 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -58,7 +58,9 @@ public:
Status collect_query_statistics(QueryStatistics* statistics) override;
virtual Status close(RuntimeState* state);
virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges);
-
+ inline void set_no_agg_finalize() {
+ _need_agg_finalize = false;
+ }
protected:
typedef struct {
Tuple* tuple;
@@ -242,6 +244,8 @@ private:
int64_t _running_thread;
EvalConjunctsFn _eval_conjuncts_fn;
+ bool _need_agg_finalize = true;
+
// Counters
RuntimeProfile::Counter* _io_timer = nullptr;
RuntimeProfile::Counter* _read_compressed_counter = nullptr;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 008cd54..f4afb85 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -43,6 +43,7 @@ OlapScanner::OlapScanner(
RuntimeState* runtime_state,
OlapScanNode* parent,
bool aggregation,
+ bool need_agg_finalize,
const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>& key_ranges)
: _runtime_state(runtime_state),
@@ -52,6 +53,7 @@ OlapScanner::OlapScanner(
_string_slots(parent->_string_slots),
_is_open(false),
_aggregation(aggregation),
+ _need_agg_finalize(need_agg_finalize),
_tuple_idx(parent->_tuple_idx),
_direct_conjunct_size(parent->_direct_conjunct_size) {
_reader.reset(new Reader());
@@ -213,6 +215,11 @@ Status OlapScanner::_init_params(
}
_read_row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
+ // If a agg node is this scan node direct parent
+ // we will not call agg object finalize method in scan node,
+ // to avoid the unnecessary SerDe and improve query performance
+ _params.need_agg_finalize = _need_agg_finalize;
+
return Status::OK();
}
@@ -264,7 +271,7 @@ Status OlapScanner::get_batch(
break;
}
// Read one row from reader
- auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), eof);
+ auto res = _reader->next_row_with_aggregation(&_read_row_cursor, arena.get(), batch->agg_object_pool(), eof);
if (res != OLAP_SUCCESS) {
return Status::InternalError("Internal Error: read storage fail.");
}
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index 0708da6..b163518 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -53,6 +53,7 @@ public:
RuntimeState* runtime_state,
OlapScanNode* parent,
bool aggregation,
+ bool need_agg_finalize,
const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>& key_ranges);
@@ -107,6 +108,7 @@ private:
int _id;
bool _is_open;
bool _aggregation;
+ bool _need_agg_finalize = true;
bool _has_update_counter = false;
Status _ctor_status;
diff --git a/be/src/exprs/aggregate_functions.cpp b/be/src/exprs/aggregate_functions.cpp
index 0c363d6..0a513f3 100644
--- a/be/src/exprs/aggregate_functions.cpp
+++ b/be/src/exprs/aggregate_functions.cpp
@@ -1191,7 +1191,7 @@ void AggregateFunctions::hll_union_agg_update(FunctionContext* ctx,
}
DCHECK(!dst->is_null);
- dst->agg_parse_and_cal(src);
+ dst->agg_parse_and_cal(ctx, src);
return ;
}
diff --git a/be/src/exprs/bitmap_function.cpp b/be/src/exprs/bitmap_function.cpp
index 5a42e79..6a45fc5 100644
--- a/be/src/exprs/bitmap_function.cpp
+++ b/be/src/exprs/bitmap_function.cpp
@@ -48,15 +48,24 @@ BigIntVal BitmapFunctions::bitmap_finalize(FunctionContext* ctx, const StringVal
}
void BitmapFunctions::bitmap_union(FunctionContext* ctx, const StringVal& src, StringVal* dst) {
- RoaringBitmap src_bitmap = RoaringBitmap((char*)src.ptr);
auto* dst_bitmap = reinterpret_cast<RoaringBitmap*>(dst->ptr);
- dst_bitmap->merge(src_bitmap);
+ // zero size means the src input is a agg object
+ if (src.len == 0) {
+ dst_bitmap->merge(*reinterpret_cast<RoaringBitmap*>(src.ptr));
+ } else {
+ dst_bitmap->merge(RoaringBitmap((char*)src.ptr));
+ }
}
BigIntVal BitmapFunctions::bitmap_count(FunctionContext* ctx, const StringVal& src) {
- RoaringBitmap bitmap ((char*)src.ptr);
- BigIntVal result(bitmap.cardinality());
- return result;
+ // zero size means the src input is a agg object
+ if (src.len == 0) {
+ auto bitmap = reinterpret_cast<RoaringBitmap*>(src.ptr);
+ return {bitmap->cardinality()};
+ } else {
+ RoaringBitmap bitmap ((char*)src.ptr);
+ return {bitmap.cardinality()};
+ }
}
StringVal BitmapFunctions::to_bitmap(doris_udf::FunctionContext* ctx, const doris_udf::StringVal& src) {
diff --git a/be/src/exprs/hll_function.cpp b/be/src/exprs/hll_function.cpp
index e91f947..246f609 100644
--- a/be/src/exprs/hll_function.cpp
+++ b/be/src/exprs/hll_function.cpp
@@ -64,10 +64,15 @@ void HllFunctions::hll_update(FunctionContext *, const T &src, StringVal* dst) {
dst_hll->update(hash_value);
}
}
+
void HllFunctions::hll_merge(FunctionContext*, const StringVal &src, StringVal* dst) {
- HyperLogLog src_hll((uint8_t*)src.ptr);
auto* dst_hll = reinterpret_cast<HyperLogLog*>(dst->ptr);
- dst_hll->merge(src_hll);
+ // zero size means the src input is a agg object
+ if (src.len == 0) {
+ dst_hll->merge(*reinterpret_cast<HyperLogLog*>(src.ptr));
+ } else {
+ dst_hll->merge(HyperLogLog(src.ptr));
+ }
}
BigIntVal HllFunctions::hll_finalize(FunctionContext*, const StringVal &src) {
diff --git a/be/src/olap/aggregate_func.h b/be/src/olap/aggregate_func.h
index 8381dca..b3155cf 100644
--- a/be/src/olap/aggregate_func.h
+++ b/be/src/olap/aggregate_func.h
@@ -17,6 +17,7 @@
#pragma once
+#include "common/object_pool.h"
#include "olap/hll.h"
#include "olap/types.h"
#include "olap/row_cursor_cell.h"
@@ -28,7 +29,7 @@
namespace doris {
-using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena);
+using AggInitFunc = void (*)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool);
using AggUpdateFunc = void (*)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena);
using AggFinalizeFunc = void (*)(RowCursorCell* src, Arena* arena);
@@ -43,8 +44,8 @@ public:
// Memory Note: For plain memory can be allocated from arena, whose lifetime
// will last util finalize function is called. Memory allocated from heap should
// be freed in finalize functioin to avoid memory leak.
- inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const {
- _init_fn(dst, src, src_null, arena);
+ inline void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const {
+ _init_fn(dst, src, src_null, arena, agg_pool);
}
// Update aggregated intermediate data. Data stored in engine is aggregated.
@@ -73,7 +74,7 @@ public:
FieldAggregationMethod agg_method() const { return _agg_method; }
private:
- void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena);
+ void (*_init_fn)(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool);
void (*_update_fn)(RowCursorCell* dst, const RowCursorCell& src, Arena* arena);
void (*_finalize_fn)(RowCursorCell* src, Arena* arena);
@@ -87,7 +88,7 @@ private:
template<FieldType field_type>
struct BaseAggregateFuncs {
- static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+ static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
@@ -113,7 +114,7 @@ struct AggregateFuncTraits : public BaseAggregateFuncs<field_type> {
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DECIMAL> :
public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
- static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+ static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
@@ -129,7 +130,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DECIMAL>
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATETIME> :
public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
- static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+ static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
@@ -144,7 +145,7 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATETIME
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_NONE, OLAP_FIELD_TYPE_DATE> :
public BaseAggregateFuncs<OLAP_FIELD_TYPE_DECIMAL> {
- static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+ static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
dst->set_is_null(src_null);
if (src_null) {
return;
@@ -398,17 +399,21 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_REPLACE, OLAP_FIELD_TYPE_CHAR>
// so when init, update hll, the src is not null
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL> {
- static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+ static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
DCHECK_EQ(src_null, false);
dst->set_not_null();
auto* src_slice = reinterpret_cast<const Slice*>(src);
auto* dst_slice = reinterpret_cast<Slice*>(dst->mutable_cell_ptr());
- dst_slice->size = sizeof(HyperLogLog);
- // use 'placement new' to allocate HyperLogLog on arena, so that we can control the memory usage.
- char* mem = arena->Allocate(dst_slice->size);
- dst_slice->data = (char*) new (mem) HyperLogLog((const uint8_t*)src_slice->data);
+ // we use zero size represent this slice is a agg object
+ dst_slice->size = 0;
+ auto* hll = new HyperLogLog((const uint8_t*) src_slice->data);
+ dst_slice->data = reinterpret_cast<char*>(hll);
+
+ arena->track_memory(sizeof(HyperLogLog));
+
+ agg_pool->add(hll);
}
static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) {
@@ -425,33 +430,36 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_HLL_UNION, OLAP_FIELD_TYPE_HLL
} else { // for stream load
auto* src_hll = reinterpret_cast<HyperLogLog*>(src_slice->data);
dst_hll->merge(*src_hll);
- // NOT use 'delete src_hll' because the memory is managed by arena
- src_hll->~HyperLogLog();
}
}
+ // The HLL object memory will be released by ObjectPool
static void finalize(RowCursorCell* src, Arena* arena) {
auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
auto *hll = reinterpret_cast<HyperLogLog*>(slice->data);
slice->data = arena->Allocate(HLL_COLUMN_DEFAULT_LEN);
slice->size = hll->serialize((uint8_t*)slice->data);
- // NOT using 'delete hll' because the memory is managed by arena
- hll->~HyperLogLog();
}
};
// when data load, after bitmap_init fucntion, bitmap_union column won't be null
// so when init, update bitmap, the src is not null
template <>
struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_VARCHAR> {
- static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) {
+ static void init(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) {
DCHECK_EQ(src_null, false);
dst->set_not_null();
auto* src_slice = reinterpret_cast<const Slice*>(src);
auto* dst_slice = reinterpret_cast<Slice*>(dst->mutable_cell_ptr());
- dst_slice->size = sizeof(RoaringBitmap);
- dst_slice->data = (char*)new RoaringBitmap(src_slice->data);
+ // we use zero size represent this slice is a agg object
+ dst_slice->size = 0;
+ auto* bitmap = new RoaringBitmap(src_slice->data);
+ dst_slice->data = (char*) bitmap;
+
+ arena->track_memory(sizeof(RoaringBitmap));
+
+ agg_pool->add(bitmap);
}
static void update(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) {
@@ -468,11 +476,10 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_
} else { // for stream load
auto* src_bitmap = reinterpret_cast<RoaringBitmap*>(src_slice->data);
dst_bitmap->merge(*src_bitmap);
-
- delete src_bitmap;
}
}
+ // The RoaringBitmap object memory will be released by ObjectPool
static void finalize(RowCursorCell* src, Arena *arena) {
auto *slice = reinterpret_cast<Slice*>(src->mutable_cell_ptr());
auto *bitmap = reinterpret_cast<RoaringBitmap*>(slice->data);
@@ -480,8 +487,6 @@ struct AggregateFuncTraits<OLAP_FIELD_AGGREGATION_BITMAP_UNION, OLAP_FIELD_TYPE_
slice->size = bitmap->size();
slice->data = arena->Allocate(slice->size);
bitmap->serialize(slice->data);
-
- delete bitmap;
}
};
diff --git a/be/src/olap/field.h b/be/src/olap/field.h
index 32ee098..caecdd9 100644
--- a/be/src/olap/field.h
+++ b/be/src/olap/field.h
@@ -66,8 +66,8 @@ public:
_agg_info->finalize(dst, arena);
}
- virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const {
- _agg_info->init(dst, src, src_null, arena);
+ virtual void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const {
+ _agg_info->init(dst, src, src_null, arena, agg_pool);
}
// todo(kks): Unify AggregateInfo::init method and Field::agg_init method
@@ -76,7 +76,7 @@ public:
// This functionn differs copy functionn in that if this filed
// contain aggregate information, this functionn will initialize
// destination in aggregate format, and update with srouce content.
- virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const {
+ virtual void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const {
direct_copy(dst, src);
}
@@ -344,7 +344,7 @@ public:
}
// the char field is especial, which need the _length info when consume raw data
- void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena) const override {
+ void consume(RowCursorCell* dst, const char* src, bool src_null, Arena* arena, ObjectPool* agg_pool) const override {
dst->set_is_null(src_null);
if (src_null) {
return;
@@ -403,8 +403,8 @@ public:
}
// bitmap storage data always not null
- void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override {
- _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena);
+ void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const override {
+ _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena, agg_pool);
}
char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
@@ -424,8 +424,8 @@ public:
}
// Hll storage data always not null
- void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena) const override {
- _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena);
+ void agg_init(RowCursorCell* dst, const RowCursorCell& src, Arena* arena, ObjectPool* agg_pool) const override {
+ _agg_info->init(dst, (const char*)src.cell_ptr(), false, arena, agg_pool);
}
char* allocate_memory(char* cell_ptr, char* variable_ptr) const override {
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 5a1b240..ae7e82b 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -68,7 +68,7 @@ void MemTable::insert(Tuple* tuple) {
bool is_null = tuple->is_null(slot->null_indicator_offset());
void* value = tuple->get_slot(slot->tuple_offset());
- _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena());
+ _schema->column(i)->consume(&cell, (const char *)value, is_null, _skip_list->arena(), &_agg_object_pool);
}
bool overwritten = false;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 80be744..3856c83 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_OLAP_MEMTABLE_H
#define DORIS_BE_SRC_OLAP_MEMTABLE_H
+#include "common/object_pool.h"
#include "olap/schema.h"
#include "olap/skiplist.h"
#include "runtime/tuple.h"
@@ -57,6 +58,7 @@ private:
RowCursorComparator _row_comparator;
Arena _arena;
+ ObjectPool _agg_object_pool;
typedef SkipList<char*, RowCursorComparator> Table;
char* _tuple_buf;
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 3141738..ae1867e 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -44,14 +44,14 @@ OLAPStatus Merger::merge_rowsets(TabletSharedPtr tablet,
RETURN_NOT_OK_LOG(row_cursor.init(tablet->tablet_schema()),
"failed to init row cursor when merging rowsets of tablet " + tablet->full_name());
row_cursor.allocate_memory_for_string_type(tablet->tablet_schema());
-
// The following procedure would last for long time, half of one day, etc.
int64_t output_rows = 0;
while (true) {
Arena arena;
+ ObjectPool objectPool;
bool eof = false;
// Read one row into row_cursor
- RETURN_NOT_OK_LOG(reader.next_row_with_aggregation(&row_cursor, &arena, &eof),
+ RETURN_NOT_OK_LOG(reader.next_row_with_aggregation(&row_cursor, &arena, &objectPool, &eof),
"failed to read next row when merging rowsets of tablet " + tablet->full_name());
if (eof) {
break;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 2151c12..9255faa 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -345,7 +345,7 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
return OLAP_SUCCESS;
}
-OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) {
+OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) {
if (UNLIKELY(_next_key == nullptr)) {
*eof = true;
return OLAP_SUCCESS;
@@ -360,12 +360,12 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool*
return OLAP_SUCCESS;
}
-OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) {
+OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) {
if (UNLIKELY(_next_key == nullptr)) {
*eof = true;
return OLAP_SUCCESS;
}
- init_row_with_others(row_cursor, *_next_key, arena);
+ init_row_with_others(row_cursor, *_next_key, arena, agg_pool);
int64_t merged_count = 0;
do {
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
@@ -389,11 +389,15 @@ OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool*
++merged_count;
} while (true);
_merged_rows += merged_count;
- agg_finalize_row(_value_cids, row_cursor, arena);
+ // For agg query, we don't need finalize agg object and directly pass agg object to agg node
+ if (_need_agg_finalize) {
+ agg_finalize_row(_value_cids, row_cursor, arena);
+ }
+
return OLAP_SUCCESS;
}
-OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof) {
+OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) {
*eof = false;
bool cur_delete_flag = false;
do {
@@ -403,7 +407,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, Arena* arena, boo
}
cur_delete_flag = _next_delete_flag;
- init_row_with_others(row_cursor, *_next_key, arena);
+ init_row_with_others(row_cursor, *_next_key, arena, agg_pool);
int64_t merged_count = 0;
while (NULL != _next_key) {
@@ -564,6 +568,7 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {
read_params.check_validation();
OLAPStatus res = OLAP_SUCCESS;
_aggregation = read_params.aggregation;
+ _need_agg_finalize = read_params.need_agg_finalize;
_reader_type = read_params.reader_type;
_tablet = read_params.tablet;
_version = read_params.version;
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 48f3be4..2183cae 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -53,6 +53,7 @@ struct ReaderParams {
TabletSharedPtr tablet;
ReaderType reader_type;
bool aggregation;
+ bool need_agg_finalize = true;
Version version;
// possible values are "gt", "ge", "eq"
std::string range;
@@ -125,8 +126,8 @@ public:
// Return OLAP_SUCCESS and set `*eof` to false when next row is read into `row_cursor`.
// Return OLAP_SUCCESS and set `*eof` to true when no more rows can be read.
// Return others when unexpected error happens.
- OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, bool *eof) {
- return (this->*_next_row_func)(row_cursor, arena, eof);
+ OLAPStatus next_row_with_aggregation(RowCursor *row_cursor, Arena* arena, ObjectPool* agg_pool, bool *eof) {
+ return (this->*_next_row_func)(row_cursor, arena, agg_pool, eof);
}
uint64_t merged_rows() const {
@@ -200,9 +201,9 @@ private:
OLAPStatus _init_load_bf_columns(const ReaderParams& read_params);
- OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof);
- OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof);
- OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, bool* eof);
+ OLAPStatus _dup_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof);
+ OLAPStatus _agg_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof);
+ OLAPStatus _unique_key_next_row(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof);
TabletSharedPtr tablet() { return _tablet; }
@@ -233,9 +234,11 @@ private:
DeleteHandler _delete_handler;
- OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, bool* eof) = nullptr;
+ OLAPStatus (Reader::*_next_row_func)(RowCursor* row_cursor, Arena* arena, ObjectPool* agg_pool, bool* eof) = nullptr;
bool _aggregation;
+ // for agg query, we don't need to finalize when scan agg object data
+ bool _need_agg_finalize = true;
bool _version_locked;
ReaderType _reader_type;
bool _next_delete_flag;
diff --git a/be/src/olap/row.h b/be/src/olap/row.h
index 1dd0581..64ea139 100644
--- a/be/src/olap/row.h
+++ b/be/src/olap/row.h
@@ -107,10 +107,10 @@ int index_compare_row(const LhsRowType& lhs, const RhsRowType& rhs) {
// function will first initialize destination column and then update with source column
// value.
template<typename DstRowType, typename SrcRowType>
-void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena) {
+void init_row_with_others(DstRowType* dst, const SrcRowType& src, Arena* arena, ObjectPool* agg_pool) {
for (auto cid : dst->schema()->column_ids()) {
auto dst_cell = dst->cell(cid);
- dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena);
+ dst->schema()->column(cid)->agg_init(&dst_cell, src.cell(cid), arena, agg_pool);
}
}
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 9bf5bb0..b7eed21 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -569,6 +569,7 @@ bool RowBlockMerger::merge(
uint64_t tmp_merged_rows = 0;
RowCursor row_cursor;
std::unique_ptr<Arena> arena(new Arena());
+ std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
if (row_cursor.init(_tablet->tablet_schema()) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init row cursor.";
goto MERGE_ERR;
@@ -578,7 +579,7 @@ bool RowBlockMerger::merge(
row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
while (_heap.size() > 0) {
- init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get());
+ init_row_with_others(&row_cursor, *(_heap.top().row_cursor), arena.get(), agg_object_pool.get());
if (!_pop_heap()) {
goto MERGE_ERR;
@@ -604,6 +605,7 @@ bool RowBlockMerger::merge(
// the memory allocate by arena has been copied,
// so we should release these memory immediately
arena.reset(new Arena());
+ agg_object_pool.reset(new ObjectPool());
}
if (rowset_writer->flush() != OLAP_SUCCESS) {
LOG(WARNING) << "failed to finalizing writer.";
diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp
index 43b5ad3..99d2333 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -105,6 +105,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
RowCursor row;
std::unique_ptr<Arena> arena(new Arena());
+ std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
res = row.init(tablet->tablet_schema(), reader_params.return_columns);
if (res != OLAP_SUCCESS) {
OLAP_LOG_WARNING("failed to init row cursor. [res=%d]", res);
@@ -115,7 +116,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
bool eof = false;
uint32_t row_checksum = 0;
while (true) {
- OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), &eof);
+ OLAPStatus res = reader.next_row_with_aggregation(&row, arena.get(), agg_object_pool.get(), &eof);
if (res == OLAP_SUCCESS && eof) {
VLOG(3) << "reader reads to the end.";
break;
@@ -128,6 +129,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() {
// the memory allocate by arena has been copied,
// so we should release these memory immediately
arena.reset(new Arena());
+ agg_object_pool.reset(new ObjectPool());
}
LOG(INFO) << "success to finish compute checksum. checksum=" << row_checksum;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 729542e..4a9eb0e 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -173,6 +173,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
VLOG(1) << "scan_nodes.size()=" << scan_nodes.size();
VLOG(1) << "params.per_node_scan_ranges.size()=" << params.per_node_scan_ranges.size();
+ _plan->try_do_aggregate_serde_improve();
+
for (int i = 0; i < scan_nodes.size(); ++i) {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
const std::vector<TScanRangeParams>& scan_ranges =
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 6e31a44..19152a2 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -48,7 +48,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
- _tuple_data_pool(new MemPool(_mem_tracker)) {
+ _tuple_data_pool(new MemPool(_mem_tracker)),
+ _agg_object_pool(new ObjectPool()) {
DCHECK(_mem_tracker != NULL);
DCHECK_GT(capacity, 0);
_tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
@@ -82,7 +83,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc,
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
- _tuple_data_pool(new MemPool(_mem_tracker)) {
+ _tuple_data_pool(new MemPool(_mem_tracker)),
+ _agg_object_pool(new ObjectPool()) {
DCHECK(_mem_tracker != nullptr);
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
@@ -173,7 +175,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
- _tuple_data_pool(new MemPool(_mem_tracker)) {
+ _tuple_data_pool(new MemPool(_mem_tracker)),
+ _agg_object_pool(new ObjectPool()) {
DCHECK(_mem_tracker != NULL);
_tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
@@ -259,6 +262,7 @@ void RowBatch::clear() {
}
_tuple_data_pool->free_all();
+ _agg_object_pool.reset(new ObjectPool());
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
}
@@ -464,6 +468,7 @@ void RowBatch::reset() {
// TODO: Change this to Clear() and investigate the repercussions.
_tuple_data_pool->free_all();
+ _agg_object_pool.reset(new ObjectPool());
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
}
@@ -500,35 +505,30 @@ void RowBatch::close_tuple_streams() {
void RowBatch::transfer_resource_ownership(RowBatch* dest) {
dest->_auxiliary_mem_usage += _tuple_data_pool->total_allocated_bytes();
dest->_tuple_data_pool->acquire_data(_tuple_data_pool.get(), false);
+ dest->_agg_object_pool->acquire_data(_agg_object_pool.get());
for (int i = 0; i < _io_buffers.size(); ++i) {
DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
dest->_io_buffers.push_back(buffer);
dest->_auxiliary_mem_usage += buffer->buffer_len();
buffer->set_mem_tracker(dest->_mem_tracker);
}
- _io_buffers.clear();
for (BufferInfo& buffer_info : _buffers) {
dest->add_buffer(
buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
}
- _buffers.clear();
for (int i = 0; i < _tuple_streams.size(); ++i) {
dest->_tuple_streams.push_back(_tuple_streams[i]);
dest->_auxiliary_mem_usage += _tuple_streams[i]->byte_size();
}
- _tuple_streams.clear();
+
for (int i = 0; i < _blocks.size(); ++i) {
dest->_blocks.push_back(_blocks[i]);
dest->_auxiliary_mem_usage += _blocks[i]->buffer_len();
}
- _blocks.clear();
+
dest->_need_to_return |= _need_to_return;
- _auxiliary_mem_usage = 0;
- if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
- _tuple_ptrs = NULL;
- }
if (_needs_deep_copy) {
dest->mark_needs_deep_copy();
@@ -589,28 +589,6 @@ void RowBatch::acquire_state(RowBatch* src) {
src->transfer_resource_ownership(this);
}
-void RowBatch::swap(RowBatch* other) {
- DCHECK(_row_desc.equals(other->_row_desc));
- DCHECK_EQ(_num_tuples_per_row, other->_num_tuples_per_row);
- DCHECK_EQ(_tuple_ptrs_size, other->_tuple_ptrs_size);
-
- // The destination row batch should be empty.
- DCHECK(!_has_in_flight_row);
- DCHECK_EQ(_tuple_data_pool->total_reserved_bytes(), 0);
-
- std::swap(_has_in_flight_row, other->_has_in_flight_row);
- std::swap(_num_rows, other->_num_rows);
- std::swap(_capacity, other->_capacity);
- if (!config::enable_partitioned_aggregation && !config::enable_new_partitioned_aggregation) {
- // Tuple pointers are allocated from tuple_data_pool_ so are transferred.
- _tuple_ptrs = other->_tuple_ptrs;
- other->_tuple_ptrs = NULL;
- } else {
- // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
- std::swap(_tuple_ptrs, other->_tuple_ptrs);
- }
-}
-
// TODO: consider computing size of batches as they are built up
int RowBatch::total_byte_size() {
int result = 0;
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 531c6ec..7aaf2a3 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -245,6 +245,9 @@ public:
MemPool* tuple_data_pool() {
return _tuple_data_pool.get();
}
+ ObjectPool* agg_object_pool() {
+ return _agg_object_pool.get();
+ }
int num_io_buffers() const {
return _io_buffers.size();
}
@@ -322,6 +325,7 @@ public:
// Transfer ownership of resources to dest. This includes tuple data in mem
// pool and io buffers.
+ // we firstly update dest resource, and then reset current resource
void transfer_resource_ownership(RowBatch* dest);
void copy_row(TupleRow* src, TupleRow* dest) {
@@ -384,14 +388,6 @@ public:
int num_buffers() const {
return _buffers.size();
}
- // Swaps all of the row batch state with 'other'. This is used for scan nodes
- // which produce RowBatches asynchronously. Typically, an ExecNode is handed
- // a row batch to populate (pull model) but ScanNodes have multiple threads
- // which push row batches. This function is used to swap the pushed row batch
- // contents with the row batch that's passed from the caller.
- // TODO: this is wasteful and makes a copy that's unnecessary. Think about cleaning
- // this up.
- void swap(RowBatch* other);
const RowDescriptor& row_desc() const {
return _row_desc;
@@ -484,6 +480,9 @@ private:
// holding (some of the) data referenced by rows
boost::scoped_ptr<MemPool> _tuple_data_pool;
+ // holding some complex agg object data (bitmap, hll)
+ std::unique_ptr<ObjectPool> _agg_object_pool;
+
// IO buffers current owned by this row batch. Ownership of IO buffers transfer
// between row batches. Any IO buffer will be owned by at most one row batch
// (i.e. they are not ref counted) so most row batches don't own any.
diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp
index 4b49c3c..aba14cd 100755
--- a/be/src/udf/udf.cpp
+++ b/be/src/udf/udf.cpp
@@ -465,9 +465,19 @@ void HllVal::init(FunctionContext* ctx) {
is_null = false;
}
-void HllVal::agg_parse_and_cal(const HllVal &other) {
+void HllVal::agg_parse_and_cal(FunctionContext* ctx, const HllVal& other) {
doris::HllSetResolver resolver;
- resolver.init((char*)other.ptr, other.len);
+
+ // zero size means the src input is a HyperLogLog object
+ if (other.len == 0) {
+ auto* hll = reinterpret_cast<doris::HyperLogLog*>(other.ptr);
+ uint8_t* other_ptr = ctx->allocate(doris::HLL_COLUMN_DEFAULT_LEN);
+ int other_len = hll->serialize(ptr);
+ resolver.init((char*)other_ptr, other_len);
+ } else {
+ resolver.init((char*)other.ptr, other.len);
+ }
+
resolver.parse();
if (resolver.get_hll_data_type() == doris::HLL_DATA_EMPTY) {
diff --git a/be/src/udf/udf.h b/be/src/udf/udf.h
index 192de66..4cf5d9b 100755
--- a/be/src/udf/udf.h
+++ b/be/src/udf/udf.h
@@ -777,7 +777,7 @@ struct HllVal : public StringVal {
void init(FunctionContext* ctx);
- void agg_parse_and_cal(const HllVal &other);
+ void agg_parse_and_cal(FunctionContext* ctx, const HllVal& other);
void agg_merge(const HllVal &other);
};
diff --git a/be/src/util/arena.h b/be/src/util/arena.h
index 4a260e9..ef042eb 100644
--- a/be/src/util/arena.h
+++ b/be/src/util/arena.h
@@ -34,6 +34,11 @@ public:
return memory_usage_.load(std::memory_order_relaxed);
}
+ // For the object wasn't allocated from Arena, but need to
+ // collect and control the object memory usage.
+ void track_memory(size_t bytes) {
+ memory_usage_.store(MemoryUsage() + bytes,std::memory_order_relaxed);
+ }
private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
diff --git a/be/test/olap/aggregate_func_test.cpp b/be/test/olap/aggregate_func_test.cpp
index 7d4ca56..21f2cc6 100644
--- a/be/test/olap/aggregate_func_test.cpp
+++ b/be/test/olap/aggregate_func_test.cpp
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
+#include "common/object_pool.h"
#include "olap/decimal12.h"
#include "olap/uint24.h"
@@ -37,6 +38,7 @@ void test_min() {
char buf[64];
Arena arena;
+ ObjectPool agg_object_pool;
const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MIN, field_type);
RowCursorCell dst(buf);
@@ -44,7 +46,7 @@ void test_min() {
{
char val_buf[16];
*(bool*)val_buf = true;
- agg->init(&dst, val_buf, true, &arena);
+ agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
@@ -110,6 +112,7 @@ void test_max() {
char buf[64];
Arena arena;
+ ObjectPool agg_object_pool;
const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_MAX, field_type);
RowCursorCell dst(buf);
@@ -117,7 +120,7 @@ void test_max() {
{
char val_buf[16];
*(bool*)val_buf = true;
- agg->init(&dst, val_buf, true, &arena);
+ agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
@@ -183,13 +186,14 @@ void test_sum() {
RowCursorCell dst(buf);
Arena arena;
+ ObjectPool agg_object_pool;
const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_SUM, field_type);
// null
{
char val_buf[16];
*(bool*)val_buf = true;
- agg->init(&dst, val_buf, true, &arena);
+ agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
@@ -255,13 +259,14 @@ void test_replace() {
RowCursorCell dst(buf);
Arena arena;
+ ObjectPool agg_object_pool;
const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type);
// null
{
char val_buf[16];
*(bool*)val_buf = true;
- agg->init(&dst, val_buf, true, &arena);
+ agg->init(&dst, val_buf, true, &arena, &agg_object_pool);
ASSERT_TRUE(*(bool*)(buf));
}
// 100
@@ -312,6 +317,7 @@ void test_replace_string() {
dst_slice->size = 0;
Arena arena;
+ ObjectPool agg_object_pool;
const AggregateInfo* agg = get_aggregate_info(OLAP_FIELD_AGGREGATION_REPLACE, field_type);
char src[string_field_size];
@@ -320,7 +326,7 @@ void test_replace_string() {
// null
{
src_cell.set_null();
- agg->init(&dst_cell, (const char*)src_slice, true, &arena);
+ agg->init(&dst_cell, (const char*)src_slice, true, &arena, &agg_object_pool);
ASSERT_TRUE(dst_cell.is_null());
}
// "12345"
diff --git a/be/test/olap/row_cursor_test.cpp b/be/test/olap/row_cursor_test.cpp
index ab538b4..cd26618 100644
--- a/be/test/olap/row_cursor_test.cpp
+++ b/be/test/olap/row_cursor_test.cpp
@@ -17,6 +17,7 @@
#include <gtest/gtest.h>
+#include "common/object_pool.h"
#include "olap/row_cursor.h"
#include "olap/tablet_schema.h"
#include "olap/row.h"
@@ -470,7 +471,8 @@ TEST_F(TestRowCursor, AggregateWithoutNull) {
left.set_field_content(4, reinterpret_cast<char*>(&l_decimal), _mem_pool.get());
left.set_field_content(5, reinterpret_cast<char*>(&l_varchar), _mem_pool.get());
- init_row_with_others(&row, left, arena.get());
+ ObjectPool agg_object_pool;
+ init_row_with_others(&row, left, arena.get(), &agg_object_pool);
RowCursor right;
res = right.init(tablet_schema);
@@ -528,7 +530,8 @@ TEST_F(TestRowCursor, AggregateWithNull) {
left.set_null(4);
left.set_field_content(5, reinterpret_cast<char*>(&l_varchar), _mem_pool.get());
- init_row_with_others(&row, left, arena.get());
+ ObjectPool agg_object_pool;
+ init_row_with_others(&row, left, arena.get(), &agg_object_pool);
RowCursor right;
res = right.init(tablet_schema);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org