You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/12/09 14:36:51 UTC
[incubator-doris] branch master updated: [refactor] modify the implements of Tuple & RowBatch (#7319)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 80c11da [refactor] modify the implements of Tuple & RowBatch (#7319)
80c11da is described below
commit 80c11da3df0d4b599256c23c1965f94ee5747ca6
Author: thinker <zc...@qq.com>
AuthorDate: Thu Dec 9 22:36:37 2021 +0800
[refactor] modify the implements of Tuple & RowBatch (#7319)
code refactor: improve code's readability, avoid const_cast
1. make loop simpler and clearer by using range-based loop grammar, it's safer than old loop style
2. iteration for _row_desc.tuple_descriptors() use index replace index and iterator mixed
3. add new function To cast_to(From from), use this union-based casting between two types to replace reinterpret_cast, this new cast is more readable
4. avoid using the same variable name for nested loop, it's dangerous
5. add const keyword for member functions followed CppCoreGuidelines
---
be/src/common/utils.h | 10 ++
be/src/runtime/row_batch.cpp | 235 ++++++++++++++++++++-----------------------
be/src/runtime/row_batch.h | 11 +-
be/src/runtime/tuple.cpp | 64 ++++++------
4 files changed, 153 insertions(+), 167 deletions(-)
diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index cb6647a..61fed1d 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -61,4 +61,14 @@ static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 7;
static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 0,
"should be power of 2");
+template <typename To, typename From>
+static inline To convert_to(From from) {
+ union {
+ From _from;
+ To _to;
+ };
+ _from = from;
+ return _to;
+}
+
} // namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 4652df4..19c91f6 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -20,6 +20,7 @@
#include <snappy/snappy.h>
#include <stdint.h> // for intptr_t
+#include "common/utils.h"
#include "gen_cpp/Data_types.h"
#include "gen_cpp/data.pb.h"
#include "runtime/buffered_tuple_stream2.inline.h"
@@ -58,10 +59,10 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
- _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
- _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
}
@@ -89,13 +90,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
- _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
- _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
}
- uint8_t* tuple_data = nullptr;
+ char* tuple_data = nullptr;
if (input_batch.is_compressed()) {
// Decompress tuple data into data pool
const char* compressed_data = input_batch.tuple_data().c_str();
@@ -104,13 +105,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
- tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size));
- success = snappy::RawUncompress(compressed_data, compressed_size,
- reinterpret_cast<char*>(tuple_data));
+ tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
+ success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
- tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data().size());
+ tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size());
memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
}
@@ -120,7 +120,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
- _tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + offset);
+ _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
@@ -128,7 +128,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
if (!_row_desc.has_varlen_slots()) {
return;
}
- const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
+
+ const auto& tuple_descs = _row_desc.tuple_descriptors();
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
@@ -136,21 +137,22 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
// we already converted.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
- std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
- for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
- if ((*desc)->string_slots().empty() && (*desc)->collection_slots().empty()) {
+ for (size_t j = 0; j < tuple_descs.size(); ++j) {
+ auto desc = tuple_descs[j];
+ if (desc->string_slots().empty() && desc->collection_slots().empty()) {
continue;
}
+
Tuple* tuple = row->get_tuple(j);
if (tuple == nullptr) {
continue;
}
- for (auto slot : (*desc)->string_slots()) {
+ for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
- int offset = reinterpret_cast<intptr_t>(string_val->ptr);
- string_val->ptr = reinterpret_cast<char*>(tuple_data + offset);
+ int offset = convert_to<int>(string_val->ptr);
+ string_val->ptr = tuple_data + offset;
// Why we do this mask? Field len of StringValue is changed from int to size_t in
// Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
@@ -160,37 +162,35 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
}
// copy collection slots
- vector<SlotDescriptor*>::const_iterator slot_collection =
- (*desc)->collection_slots().begin();
- for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) {
- DCHECK((*slot_collection)->type().is_collection_type());
+ for (auto slot_collection : desc->collection_slots()) {
+ DCHECK(slot_collection->type().is_collection_type());
CollectionValue* array_val =
- tuple->get_collection_slot((*slot_collection)->tuple_offset());
+ tuple->get_collection_slot(slot_collection->tuple_offset());
// assgin data and null_sign pointer position in tuple_data
- int data_offset = reinterpret_cast<intptr_t>(array_val->data());
- array_val->set_data(reinterpret_cast<char*>(tuple_data + data_offset));
- int null_offset = reinterpret_cast<intptr_t>(array_val->null_signs());
- array_val->set_null_signs(reinterpret_cast<bool*>(tuple_data + null_offset));
+ int data_offset = convert_to<int>(array_val->data());
+ array_val->set_data(tuple_data + data_offset);
+ int null_offset = convert_to<int>(array_val->null_signs());
+ array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
- const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0);
+ const TypeDescriptor& item_type = slot_collection->type().children.at(0);
if (!item_type.is_string_type()) {
continue;
}
// copy every string item
- for (int i = 0; i < array_val->length(); ++i) {
- if (array_val->is_null_at(i)) {
+ for (size_t k = 0; k < array_val->length(); ++k) {
+ if (array_val->is_null_at(k)) {
continue;
}
- StringValue* dst_item_v = reinterpret_cast<StringValue*>(
- (uint8_t*)array_val->data() + i * item_type.get_slot_size());
+ StringValue* dst_item_v = convert_to<StringValue*>(
+ (uint8_t*)array_val->data() + k * item_type.get_slot_size());
if (dst_item_v->len != 0) {
- int offset = reinterpret_cast<intptr_t>(dst_item_v->ptr);
- dst_item_v->ptr = reinterpret_cast<char*>(tuple_data + offset);
+ int offset = convert_to<int>(dst_item_v->ptr);
+ dst_item_v->ptr = tuple_data + offset;
}
}
}
@@ -222,13 +222,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
- _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)malloc(_tuple_ptrs_size);
DCHECK(_tuple_ptrs != nullptr);
} else {
- _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
}
- uint8_t* tuple_data = nullptr;
+ char* tuple_data = nullptr;
if (input_batch.is_compressed) {
// Decompress tuple data into data pool
const char* compressed_data = input_batch.tuple_data.c_str();
@@ -237,24 +237,22 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
- tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size));
- success = snappy::RawUncompress(compressed_data, compressed_size,
- reinterpret_cast<char*>(tuple_data));
+ tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
+ success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
- tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data.size());
+ tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data.size());
memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
}
// convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
- for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin();
- offset != input_batch.tuple_offsets.end(); ++offset) {
- if (*offset == -1) {
+ for (auto offset : input_batch.tuple_offsets) {
+ if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
- _tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset);
+ _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
@@ -262,7 +260,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
if (!_row_desc.has_varlen_slots()) {
return;
}
- const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
+
+ const auto& tuple_descs = _row_desc.tuple_descriptors();
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
@@ -270,9 +269,9 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
// we already converted.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
- std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
- for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
- if ((*desc)->string_slots().empty() && (*desc)->collection_slots().empty()) {
+ for (size_t j = 0; j < tuple_descs.size(); ++j) {
+ auto desc = tuple_descs[j];
+ if (desc->string_slots().empty() && desc->collection_slots().empty()) {
continue;
}
@@ -281,13 +280,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
continue;
}
- std::vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
- for (; slot != (*desc)->string_slots().end(); ++slot) {
- DCHECK((*slot)->type().is_string_type());
- StringValue* string_val = tuple->get_string_slot((*slot)->tuple_offset());
+ for (auto slot : desc->string_slots()) {
+ DCHECK(slot->type().is_string_type());
+ StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
- int offset = reinterpret_cast<intptr_t>(string_val->ptr);
- string_val->ptr = reinterpret_cast<char*>(tuple_data + offset);
+ int offset = convert_to<int>(string_val->ptr);
+ string_val->ptr = tuple_data + offset;
// Why we do this mask? Field len of StringValue is changed from int to size_t in
// Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
@@ -297,35 +295,33 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
}
// copy collection slot
- vector<SlotDescriptor*>::const_iterator slot_collection =
- (*desc)->collection_slots().begin();
- for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) {
- DCHECK((*slot_collection)->type().is_collection_type());
+ for (auto slot_collection : desc->collection_slots()) {
+ DCHECK(slot_collection->type().is_collection_type());
CollectionValue* array_val =
- tuple->get_collection_slot((*slot_collection)->tuple_offset());
+ tuple->get_collection_slot(slot_collection->tuple_offset());
- int offset = reinterpret_cast<intptr_t>(array_val->data());
- array_val->set_data(reinterpret_cast<char*>(tuple_data + offset));
- int null_offset = reinterpret_cast<intptr_t>(array_val->null_signs());
- array_val->set_null_signs(reinterpret_cast<bool*>(tuple_data + null_offset));
+ int offset = convert_to<int>(array_val->data());
+ array_val->set_data(tuple_data + offset);
+ int null_offset = convert_to<int>(array_val->null_signs());
+ array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
- const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0);
+ const TypeDescriptor& item_type = slot_collection->type().children.at(0);
if (!item_type.is_string_type()) {
continue;
}
// copy string item
- for (int i = 0; i < array_val->length(); ++i) {
- if (array_val->is_null_at(i)) {
+ for (size_t k = 0; k < array_val->length(); ++k) {
+ if (array_val->is_null_at(k)) {
continue;
}
- StringValue* dst_item_v = reinterpret_cast<StringValue*>(
- (uint8_t*)array_val->data() + i * item_type.get_slot_size());
+ StringValue* dst_item_v = convert_to<StringValue*>(
+ (uint8_t*)array_val->data() + k * item_type.get_slot_size());
if (dst_item_v->len != 0) {
- int offset = reinterpret_cast<intptr_t>(dst_item_v->ptr);
- dst_item_v->ptr = reinterpret_cast<char*>(tuple_data + offset);
+ int offset = convert_to<int>(dst_item_v->ptr);
+ dst_item_v->ptr = tuple_data + offset;
}
}
}
@@ -381,14 +377,13 @@ size_t RowBatch::serialize(TRowBatch* output_batch) {
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
int offset = 0; // current offset into output_batch->tuple_data
- char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
+ char* tuple_data = output_batch->tuple_data.data();
+ const auto& tuple_descs = _row_desc.tuple_descriptors();
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
- const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
- std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-
- for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
+ for (size_t j = 0; j < tuple_descs.size(); ++j) {
+ auto desc = tuple_descs[j];
if (row->get_tuple(j) == nullptr) {
// NULLs are encoded as -1
output_batch->tuple_offsets.push_back(-1);
@@ -397,7 +392,7 @@ size_t RowBatch::serialize(TRowBatch* output_batch) {
// Record offset before creating copy (which increments offset and tuple_data)
output_batch->tuple_offsets.push_back(offset);
- row->get_tuple(j)->deep_copy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
+ row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
@@ -414,7 +409,7 @@ size_t RowBatch::serialize(TRowBatch* output_batch) {
}
size_t compressed_size = 0;
- char* compressed_output = const_cast<char*>(_compression_scratch.c_str());
+ char* compressed_output = _compression_scratch.data();
snappy::RawCompress(output_batch->tuple_data.c_str(), size, compressed_output,
&compressed_size);
@@ -450,20 +445,22 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
int offset = 0; // current offset into output_batch->tuple_data
- char* tuple_data = const_cast<char*>(mutable_tuple_data->data());
+ char* tuple_data = mutable_tuple_data->data();
+ const auto& tuple_descs = _row_desc.tuple_descriptors();
+ const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
+
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
- const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
- std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
- for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
+ for (size_t j = 0; j < tuple_descs.size(); ++j) {
+ auto desc = tuple_descs[j];
if (row->get_tuple(j) == nullptr) {
// NULLs are encoded as -1
- output_batch->mutable_tuple_offsets()->Add(-1);
+ mutable_tuple_offsets->Add(-1);
continue;
}
// Record offset before creating copy (which increments offset and tuple_data)
- output_batch->mutable_tuple_offsets()->Add(offset);
- row->get_tuple(j)->deep_copy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
+ mutable_tuple_offsets->Add(offset);
+ row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
@@ -480,7 +477,7 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
}
size_t compressed_size = 0;
- char* compressed_output = const_cast<char*>(_compression_scratch.c_str());
+ char* compressed_output = _compression_scratch.data();
snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size);
if (LIKELY(compressed_size < size)) {
@@ -506,12 +503,12 @@ void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) {
Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size,
uint8_t** buffer) {
- const int row_size = _row_desc.get_row_size();
+ int64_t row_size = _row_desc.get_row_size();
// Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
if (row_size != 0) {
- _capacity = std::max(1, std::min(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
+ _capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
}
- *tuple_buffer_size = static_cast<int64_t>(row_size) * _capacity;
+ *tuple_buffer_size = row_size * _capacity;
// TODO(dhc): change allocate to try_allocate?
*buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
if (*buffer == nullptr) {
@@ -560,7 +557,7 @@ void RowBatch::reset() {
_blocks.clear();
_auxiliary_mem_usage = 0;
if (!config::enable_partitioned_aggregation) {
- _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+ _tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
_need_to_return = false;
_flush = FlushMode::NO_FLUSH_RESOURCES;
@@ -677,52 +674,48 @@ void RowBatch::deep_copy_to(RowBatch* dst) {
dst->add_rows(_num_rows);
for (int i = 0; i < _num_rows; ++i) {
TupleRow* src_row = get_row(i);
- TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
- src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool,
- false);
+ TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
+ src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false);
}
dst->commit_rows(_num_rows);
}
// TODO: consider computing size of batches as they are built up
-size_t RowBatch::total_byte_size() {
+size_t RowBatch::total_byte_size() const {
size_t result = 0;
// Sum total variable length byte sizes.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
- const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
- std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-
- for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
+ const auto& tuple_descs = _row_desc.tuple_descriptors();
+ for (size_t j = 0; j < tuple_descs.size(); ++j) {
+ auto desc = tuple_descs[j];
Tuple* tuple = row->get_tuple(j);
if (tuple == nullptr) {
continue;
}
- result += (*desc)->byte_size();
- std::vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
- for (; slot != (*desc)->string_slots().end(); ++slot) {
- DCHECK((*slot)->type().is_string_type());
- if (tuple->is_null((*slot)->null_indicator_offset())) {
+ result += desc->byte_size();
+
+ for (auto slot : desc->string_slots()) {
+ DCHECK(slot->type().is_string_type());
+ if (tuple->is_null(slot->null_indicator_offset())) {
continue;
}
- StringValue* string_val = tuple->get_string_slot((*slot)->tuple_offset());
+ StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
result += string_val->len;
}
// compute slot collection size
- vector<SlotDescriptor*>::const_iterator slot_collection =
- (*desc)->collection_slots().begin();
- for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) {
- DCHECK((*slot_collection)->type().is_collection_type());
- if (tuple->is_null((*slot_collection)->null_indicator_offset())) {
+ for (auto slot_collection : desc->collection_slots()) {
+ DCHECK(slot_collection->type().is_collection_type());
+ if (tuple->is_null(slot_collection->null_indicator_offset())) {
continue;
}
// compute data null_signs size
CollectionValue* array_val =
- tuple->get_collection_slot((*slot_collection)->tuple_offset());
+ tuple->get_collection_slot(slot_collection->tuple_offset());
result += array_val->length() * sizeof(bool);
- const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0);
+ const TypeDescriptor& item_type = slot_collection->type().children.at(0);
result += array_val->length() * item_type.get_slot_size();
if (!item_type.is_string_type()) {
@@ -730,12 +723,12 @@ size_t RowBatch::total_byte_size() {
}
// compute string type item size
- for (int i = 0; i < array_val->length(); ++i) {
- if (array_val->is_null_at(i)) {
+ for (int k = 0; k < array_val->length(); ++k) {
+ if (array_val->is_null_at(k)) {
continue;
}
- StringValue* dst_item_v = reinterpret_cast<StringValue*>(
- (uint8_t*)array_val->data() + i * item_type.get_slot_size());
+ StringValue* dst_item_v = convert_to<StringValue*>(
+ (uint8_t*)array_val->data() + k * item_type.get_slot_size());
result += dst_item_v->len;
}
}
@@ -745,20 +738,6 @@ size_t RowBatch::total_byte_size() {
return result;
}
-int RowBatch::max_tuple_buffer_size() const {
- int row_size = _row_desc.get_row_size();
- if (row_size > AT_CAPACITY_MEM_USAGE) {
- return row_size;
- }
- int num_rows = 0;
- if (row_size != 0) {
- num_rows = std::min(_capacity, AT_CAPACITY_MEM_USAGE / row_size);
- }
- int tuple_buffer_size = num_rows * row_size;
- DCHECK_LE(tuple_buffer_size, AT_CAPACITY_MEM_USAGE);
- return tuple_buffer_size;
-}
-
void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
FlushMode flush) {
_auxiliary_mem_usage += buffer.len();
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 637c4b9..5fcae81 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -169,7 +169,7 @@ public:
// The total size of all data represented in this row batch (tuples and referenced
// string data).
- size_t total_byte_size();
+ size_t total_byte_size() const;
TupleRow* get_row(int row_idx) const {
DCHECK(_tuple_ptrs != nullptr);
@@ -214,10 +214,10 @@ public:
/// Returns true if the iterator is beyond the last row for read iterators.
/// Useful for read iterators to determine the limit. Write iterators should use
/// RowBatch::AtCapacity() instead.
- bool IR_ALWAYS_INLINE at_end() { return _row >= _row_batch_end; }
+ bool IR_ALWAYS_INLINE at_end() const { return _row >= _row_batch_end; }
/// Returns the row batch which this iterator is iterating through.
- RowBatch* parent() { return _parent; }
+ RowBatch* parent() const { return _parent; }
private:
/// Number of tuples per row.
@@ -309,7 +309,7 @@ public:
// we firstly update dest resource, and then reset current resource
void transfer_resource_ownership(RowBatch* dest);
- void copy_row(TupleRow* src, TupleRow* dest) {
+ void copy_row(const TupleRow* src, TupleRow* dest) const {
memcpy(dest, src, _num_tuples_per_row * sizeof(Tuple*));
}
@@ -385,9 +385,6 @@ public:
void set_scanner_id(int id) { _scanner_id = id; }
int scanner_id() const { return _scanner_id; }
- // Computes the maximum size needed to store tuple data for this row batch.
- int max_tuple_buffer_size() const;
-
static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024;
std::string to_string();
diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp
index 398a26c..9816f79 100644
--- a/be/src/runtime/tuple.cpp
+++ b/be/src/runtime/tuple.cpp
@@ -23,6 +23,7 @@
#include <string>
#include <vector>
+#include "common/utils.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "runtime/collection_value.h"
@@ -60,7 +61,7 @@ int64_t Tuple::varlen_byte_size(const TupleDescriptor& desc) const {
}
Tuple* Tuple::deep_copy(const TupleDescriptor& desc, MemPool* pool, bool convert_ptrs) {
- Tuple* result = reinterpret_cast<Tuple*>(pool->allocate(desc.byte_size()));
+ Tuple* result = (Tuple*)(pool->allocate(desc.byte_size()));
deep_copy(result, desc, pool, convert_ptrs);
return result;
}
@@ -69,17 +70,15 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
memory_copy(dst, this, desc.byte_size());
// allocate in the same pool and then copy all non-null string slots
- for (std::vector<SlotDescriptor*>::const_iterator i = desc.string_slots().begin();
- i != desc.string_slots().end(); ++i) {
- DCHECK((*i)->type().is_string_type());
-
- StringValue* string_v = dst->get_string_slot((*i)->tuple_offset());
- if (!dst->is_null((*i)->null_indicator_offset())) {
+ for (auto string_slot : desc.string_slots()) {
+ DCHECK(string_slot->type().is_string_type());
+ StringValue* string_v = dst->get_string_slot(string_slot->tuple_offset());
+ if (!dst->is_null(string_slot->null_indicator_offset())) {
if (string_v->len != 0) {
int offset = pool->total_allocated_bytes();
- char* string_copy = reinterpret_cast<char*>(pool->allocate(string_v->len));
+ char* string_copy = (char*)(pool->allocate(string_v->len));
memory_copy(string_copy, string_v->ptr, string_v->len);
- string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(offset) : string_copy);
+ string_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
}
} else {
string_v->ptr = nullptr;
@@ -103,12 +102,12 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
int nulls_size = cv->length() * sizeof(bool);
int offset = pool->total_allocated_bytes();
- char* coll_data = reinterpret_cast<char*>(pool->allocate(coll_byte_size + nulls_size));
+ char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size));
// copy data and null_signs
if (nulls_size > 0) {
cv->set_has_null(true);
- cv->set_null_signs(reinterpret_cast<bool*>(coll_data) + coll_byte_size);
+ cv->set_null_signs(convert_to<bool*>(coll_data) + coll_byte_size);
memory_copy(coll_data, cv->null_signs(), nulls_size);
} else {
cv->set_has_null(false);
@@ -116,9 +115,8 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size);
// assgin new null_sign and data location
- cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(offset)
- : reinterpret_cast<bool*>(coll_data));
- cv->set_data(convert_ptrs ? reinterpret_cast<char*>(offset + nulls_size)
+ cv->set_null_signs(convert_ptrs ? convert_to<bool*>(offset) : convert_to<bool*>(coll_data));
+ cv->set_data(convert_ptrs ? convert_to<char*>(offset + nulls_size)
: coll_data + nulls_size);
if (!item_type.is_string_type()) {
@@ -130,19 +128,19 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
if (cv->is_null_at(i)) {
continue;
}
- StringValue* dst_item_v = reinterpret_cast<StringValue*>(coll_data + item_offset);
+ StringValue* dst_item_v = convert_to<StringValue*>(coll_data + item_offset);
if (dst_item_v->len != 0) {
int offset = pool->total_allocated_bytes();
- char* string_copy = reinterpret_cast<char*>(pool->allocate(dst_item_v->len));
+ char* string_copy = (char*)(pool->allocate(dst_item_v->len));
memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
- dst_item_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(offset) : string_copy);
+ dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
}
}
}
}
Tuple* Tuple::dcopy_with_new(const TupleDescriptor& desc, MemPool* pool, int64_t* bytes) {
- Tuple* result = reinterpret_cast<Tuple*>(pool->allocate(desc.byte_size()));
+ Tuple* result = (Tuple*)(pool->allocate(desc.byte_size()));
*bytes = dcopy_with_new(result, desc);
return result;
}
@@ -176,6 +174,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) {
if (!is_null(slot->null_indicator_offset())) {
StringValue* string_v = get_string_slot(slot->tuple_offset());
delete[] string_v->ptr;
+ string_v->ptr = nullptr;
bytes += string_v->len;
}
}
@@ -183,7 +182,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) {
}
void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, bool convert_ptrs) {
- Tuple* dst = reinterpret_cast<Tuple*>(*data);
+ Tuple* dst = (Tuple*)(*data);
memory_copy(dst, this, desc.byte_size());
*data += desc.byte_size();
*offset += desc.byte_size();
@@ -193,11 +192,11 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
StringValue* string_v = dst->get_string_slot(slot_desc->tuple_offset());
if (!dst->is_null(slot_desc->null_indicator_offset())) {
memory_copy(*data, string_v->ptr, string_v->len);
- string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
+ string_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data);
*data += string_v->len;
*offset += string_v->len;
} else {
- string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
+ string_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data);
string_v->len = 0;
}
}
@@ -221,9 +220,9 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
memory_copy(*data + nulls_size, cv->data(), coll_byte_size);
if (!item_type.is_string_type()) {
- cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(*offset)
- : reinterpret_cast<bool*>(*data));
- cv->set_data(convert_ptrs ? reinterpret_cast<char*>(*offset + nulls_size)
+ cv->set_null_signs(convert_ptrs ? convert_to<bool*>(*offset)
+ : convert_to<bool*>(*data));
+ cv->set_data(convert_ptrs ? convert_to<char*>(*offset + nulls_size)
: *data + nulls_size);
*data += coll_byte_size + nulls_size;
*offset += coll_byte_size + nulls_size;
@@ -242,18 +241,18 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
if (cv->is_null_at(i)) {
continue;
}
- StringValue* dst_item_v = reinterpret_cast<StringValue*>(base_data + item_offset);
+ StringValue* dst_item_v = convert_to<StringValue*>(base_data + item_offset);
if (dst_item_v->len != 0) {
memory_copy(*data, dst_item_v->ptr, dst_item_v->len);
- dst_item_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
+ dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data);
*data += dst_item_v->len;
*offset += dst_item_v->len;
}
}
// assgin new null_sign and data location
- cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(base_offset)
- : reinterpret_cast<bool*>(base_data));
- cv->set_data(convert_ptrs ? reinterpret_cast<char*>(base_offset + nulls_size)
+ cv->set_null_signs(convert_ptrs ? convert_to<bool*>(base_offset)
+ : convert_to<bool*>(base_data));
+ cv->set_data(convert_ptrs ? convert_to<char*>(base_offset + nulls_size)
: base_data + nulls_size);
}
}
@@ -270,8 +269,9 @@ void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc,
memset(this, 0, desc.num_null_bytes());
// Evaluate the output_slot_exprs and place the results in the tuples.
int mat_expr_index = 0;
- for (int i = 0; i < desc.slots().size(); ++i) {
- SlotDescriptor* slot_desc = desc.slots()[i];
+ auto& slots = desc.slots();
+ for (int i = 0; i < slots.size(); ++i) {
+ SlotDescriptor* slot_desc = slots[i];
if (!slot_desc->is_materialized()) {
continue;
}
@@ -297,7 +297,7 @@ void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc,
RawValue::write(src, dst, slot_desc->type(), pool);
if (collect_string_vals) {
if (slot_desc->type().is_string_type()) {
- StringValue* string_val = reinterpret_cast<StringValue*>(dst);
+ StringValue* string_val = convert_to<StringValue*>(dst);
non_null_var_len_values->push_back(string_val);
*total_var_len += string_val->len;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org