You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/12/09 14:36:51 UTC

[incubator-doris] branch master updated: [refactor] modify the implements of Tuple & RowBatch (#7319)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 80c11da  [refactor] modify the implements of Tuple & RowBatch (#7319)
80c11da is described below

commit 80c11da3df0d4b599256c23c1965f94ee5747ca6
Author: thinker <zc...@qq.com>
AuthorDate: Thu Dec 9 22:36:37 2021 +0800

    [refactor] modify the implements of Tuple & RowBatch (#7319)
    
    code refactor: improve code's readability, avoid const_cast
    
    1. make loop simpler and clearer by using range-based loop grammar, it's safer than old loop style
    2. iteration for _row_desc.tuple_descriptors() use index replace index and iterator mixed
    3. add new function To cast_to(From from), use this union-based casting between two types to replace reinterpret_cast,  this new cast is more readable
    4. avoid using the same variable name for nested loop, it's dangerous
    5. add const keyword for member functions followed CppCoreGuidelines
---
 be/src/common/utils.h        |  10 ++
 be/src/runtime/row_batch.cpp | 235 ++++++++++++++++++++-----------------------
 be/src/runtime/row_batch.h   |  11 +-
 be/src/runtime/tuple.cpp     |  64 ++++++------
 4 files changed, 153 insertions(+), 167 deletions(-)

diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index cb6647a..61fed1d 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -61,4 +61,14 @@ static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 7;
 static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 0,
               "should be power of 2");
 
+template <typename To, typename From> 
+static inline To convert_to(From from) {
+    union {
+        From _from;
+        To _to;
+    };
+    _from = from;
+    return _to;
+}
+
 } // namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 4652df4..19c91f6 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -20,6 +20,7 @@
 #include <snappy/snappy.h>
 #include <stdint.h> // for intptr_t
 
+#include "common/utils.h"
 #include "gen_cpp/Data_types.h"
 #include "gen_cpp/data.pb.h"
 #include "runtime/buffered_tuple_stream2.inline.h"
@@ -58,10 +59,10 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
     // TODO: switch to Init() pattern so we can check memory limit and return Status.
     if (config::enable_partitioned_aggregation) {
         _mem_tracker->Consume(_tuple_ptrs_size);
-        _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
         DCHECK(_tuple_ptrs != nullptr);
     } else {
-        _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size));
     }
 }
 
@@ -89,13 +90,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
     // TODO: switch to Init() pattern so we can check memory limit and return Status.
     if (config::enable_partitioned_aggregation) {
         _mem_tracker->Consume(_tuple_ptrs_size);
-        _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
         DCHECK(_tuple_ptrs != nullptr);
     } else {
-        _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
     }
 
-    uint8_t* tuple_data = nullptr;
+    char* tuple_data = nullptr;
     if (input_batch.is_compressed()) {
         // Decompress tuple data into data pool
         const char* compressed_data = input_batch.tuple_data().c_str();
@@ -104,13 +105,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
         bool success =
                 snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
         DCHECK(success) << "snappy::GetUncompressedLength failed";
-        tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size));
-        success = snappy::RawUncompress(compressed_data, compressed_size,
-                                        reinterpret_cast<char*>(tuple_data));
+        tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
+        success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
         DCHECK(success) << "snappy::RawUncompress failed";
     } else {
         // Tuple data uncompressed, copy directly into data pool
-        tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data().size());
+        tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size());
         memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
     }
 
@@ -120,7 +120,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
         if (offset == -1) {
             _tuple_ptrs[tuple_idx++] = nullptr;
         } else {
-            _tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + offset);
+            _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
         }
     }
 
@@ -128,7 +128,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
     if (!_row_desc.has_varlen_slots()) {
         return;
     }
-    const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
+
+    const auto& tuple_descs = _row_desc.tuple_descriptors();
 
     // For every unique tuple, convert string offsets contained in tuple data into
     // pointers. Tuples were serialized in the order we are deserializing them in,
@@ -136,21 +137,22 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
     // we already converted.
     for (int i = 0; i < _num_rows; ++i) {
         TupleRow* row = get_row(i);
-        std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-        for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
-            if ((*desc)->string_slots().empty() && (*desc)->collection_slots().empty()) {
+        for (size_t j = 0; j < tuple_descs.size(); ++j) {
+            auto desc = tuple_descs[j];
+            if (desc->string_slots().empty() && desc->collection_slots().empty()) {
                 continue;
             }
+
             Tuple* tuple = row->get_tuple(j);
             if (tuple == nullptr) {
                 continue;
             }
 
-            for (auto slot : (*desc)->string_slots()) {
+            for (auto slot : desc->string_slots()) {
                 DCHECK(slot->type().is_string_type());
                 StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
-                int offset = reinterpret_cast<intptr_t>(string_val->ptr);
-                string_val->ptr = reinterpret_cast<char*>(tuple_data + offset);
+                int offset = convert_to<int>(string_val->ptr);
+                string_val->ptr = tuple_data + offset;
 
                 // Why we do this mask? Field len of StringValue is changed from int to size_t in
                 // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
@@ -160,37 +162,35 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
             }
 
             // copy collection slots
-            vector<SlotDescriptor*>::const_iterator slot_collection =
-                    (*desc)->collection_slots().begin();
-            for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) {
-                DCHECK((*slot_collection)->type().is_collection_type());
+            for (auto slot_collection : desc->collection_slots()) {
+                DCHECK(slot_collection->type().is_collection_type());
 
                 CollectionValue* array_val =
-                        tuple->get_collection_slot((*slot_collection)->tuple_offset());
+                        tuple->get_collection_slot(slot_collection->tuple_offset());
 
                 // assgin data and null_sign pointer position in tuple_data
-                int data_offset = reinterpret_cast<intptr_t>(array_val->data());
-                array_val->set_data(reinterpret_cast<char*>(tuple_data + data_offset));
-                int null_offset = reinterpret_cast<intptr_t>(array_val->null_signs());
-                array_val->set_null_signs(reinterpret_cast<bool*>(tuple_data + null_offset));
+                int data_offset = convert_to<int>(array_val->data());
+                array_val->set_data(tuple_data + data_offset);
+                int null_offset = convert_to<int>(array_val->null_signs());
+                array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
 
-                const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0);
+                const TypeDescriptor& item_type = slot_collection->type().children.at(0);
                 if (!item_type.is_string_type()) {
                     continue;
                 }
 
                 // copy every string item
-                for (int i = 0; i < array_val->length(); ++i) {
-                    if (array_val->is_null_at(i)) {
+                for (size_t k = 0; k < array_val->length(); ++k) {
+                    if (array_val->is_null_at(k)) {
                         continue;
                     }
 
-                    StringValue* dst_item_v = reinterpret_cast<StringValue*>(
-                            (uint8_t*)array_val->data() + i * item_type.get_slot_size());
+                    StringValue* dst_item_v = convert_to<StringValue*>(
+                            (uint8_t*)array_val->data() + k * item_type.get_slot_size());
 
                     if (dst_item_v->len != 0) {
-                        int offset = reinterpret_cast<intptr_t>(dst_item_v->ptr);
-                        dst_item_v->ptr = reinterpret_cast<char*>(tuple_data + offset);
+                        int offset = convert_to<int>(dst_item_v->ptr);
+                        dst_item_v->ptr = tuple_data + offset;
                     }
                 }
             }
@@ -222,13 +222,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
     // TODO: switch to Init() pattern so we can check memory limit and return Status.
     if (config::enable_partitioned_aggregation) {
         _mem_tracker->Consume(_tuple_ptrs_size);
-        _tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)malloc(_tuple_ptrs_size);
         DCHECK(_tuple_ptrs != nullptr);
     } else {
-        _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
     }
 
-    uint8_t* tuple_data = nullptr;
+    char* tuple_data = nullptr;
     if (input_batch.is_compressed) {
         // Decompress tuple data into data pool
         const char* compressed_data = input_batch.tuple_data.c_str();
@@ -237,24 +237,22 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
         bool success =
                 snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
         DCHECK(success) << "snappy::GetUncompressedLength failed";
-        tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size));
-        success = snappy::RawUncompress(compressed_data, compressed_size,
-                                        reinterpret_cast<char*>(tuple_data));
+        tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
+        success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
         DCHECK(success) << "snappy::RawUncompress failed";
     } else {
         // Tuple data uncompressed, copy directly into data pool
-        tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data.size());
+        tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data.size());
         memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
     }
 
     // convert input_batch.tuple_offsets into pointers
     int tuple_idx = 0;
-    for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin();
-         offset != input_batch.tuple_offsets.end(); ++offset) {
-        if (*offset == -1) {
+    for (auto offset : input_batch.tuple_offsets) {
+        if (offset == -1) {
             _tuple_ptrs[tuple_idx++] = nullptr;
         } else {
-            _tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset);
+            _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
         }
     }
 
@@ -262,7 +260,8 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
     if (!_row_desc.has_varlen_slots()) {
         return;
     }
-    const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
+
+    const auto& tuple_descs = _row_desc.tuple_descriptors();
 
     // For every unique tuple, convert string offsets contained in tuple data into
     // pointers. Tuples were serialized in the order we are deserializing them in,
@@ -270,9 +269,9 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
     // we already converted.
     for (int i = 0; i < _num_rows; ++i) {
         TupleRow* row = get_row(i);
-        std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-        for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
-            if ((*desc)->string_slots().empty() && (*desc)->collection_slots().empty()) {
+        for (size_t j = 0; j < tuple_descs.size(); ++j) {
+            auto desc = tuple_descs[j];
+            if (desc->string_slots().empty() && desc->collection_slots().empty()) {
                 continue;
             }
 
@@ -281,13 +280,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
                 continue;
             }
 
-            std::vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
-            for (; slot != (*desc)->string_slots().end(); ++slot) {
-                DCHECK((*slot)->type().is_string_type());
-                StringValue* string_val = tuple->get_string_slot((*slot)->tuple_offset());
+            for (auto slot : desc->string_slots()) {
+                DCHECK(slot->type().is_string_type());
+                StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
 
-                int offset = reinterpret_cast<intptr_t>(string_val->ptr);
-                string_val->ptr = reinterpret_cast<char*>(tuple_data + offset);
+                int offset = convert_to<int>(string_val->ptr);
+                string_val->ptr = tuple_data + offset;
 
                 // Why we do this mask? Field len of StringValue is changed from int to size_t in
                 // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
@@ -297,35 +295,33 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
             }
 
             // copy collection slot
-            vector<SlotDescriptor*>::const_iterator slot_collection =
-                    (*desc)->collection_slots().begin();
-            for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) {
-                DCHECK((*slot_collection)->type().is_collection_type());
+            for (auto slot_collection : desc->collection_slots()) {
+                DCHECK(slot_collection->type().is_collection_type());
                 CollectionValue* array_val =
-                        tuple->get_collection_slot((*slot_collection)->tuple_offset());
+                        tuple->get_collection_slot(slot_collection->tuple_offset());
 
-                int offset = reinterpret_cast<intptr_t>(array_val->data());
-                array_val->set_data(reinterpret_cast<char*>(tuple_data + offset));
-                int null_offset = reinterpret_cast<intptr_t>(array_val->null_signs());
-                array_val->set_null_signs(reinterpret_cast<bool*>(tuple_data + null_offset));
+                int offset = convert_to<int>(array_val->data());
+                array_val->set_data(tuple_data + offset);
+                int null_offset = convert_to<int>(array_val->null_signs());
+                array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
 
-                const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0);
+                const TypeDescriptor& item_type = slot_collection->type().children.at(0);
                 if (!item_type.is_string_type()) {
                     continue;
                 }
 
                 // copy string item
-                for (int i = 0; i < array_val->length(); ++i) {
-                    if (array_val->is_null_at(i)) {
+                for (size_t k = 0; k < array_val->length(); ++k) {
+                    if (array_val->is_null_at(k)) {
                         continue;
                     }
 
-                    StringValue* dst_item_v = reinterpret_cast<StringValue*>(
-                            (uint8_t*)array_val->data() + i * item_type.get_slot_size());
+                    StringValue* dst_item_v = convert_to<StringValue*>(
+                            (uint8_t*)array_val->data() + k * item_type.get_slot_size());
 
                     if (dst_item_v->len != 0) {
-                        int offset = reinterpret_cast<intptr_t>(dst_item_v->ptr);
-                        dst_item_v->ptr = reinterpret_cast<char*>(tuple_data + offset);
+                        int offset = convert_to<int>(dst_item_v->ptr);
+                        dst_item_v->ptr = tuple_data + offset;
                     }
                 }
             }
@@ -381,14 +377,13 @@ size_t RowBatch::serialize(TRowBatch* output_batch) {
     // Copy tuple data, including strings, into output_batch (converting string
     // pointers into offsets in the process)
     int offset = 0; // current offset into output_batch->tuple_data
-    char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
+    char* tuple_data = output_batch->tuple_data.data();
+    const auto& tuple_descs = _row_desc.tuple_descriptors();
 
     for (int i = 0; i < _num_rows; ++i) {
         TupleRow* row = get_row(i);
-        const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
-        std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-
-        for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
+        for (size_t j = 0; j < tuple_descs.size(); ++j) {
+            auto desc = tuple_descs[j];
             if (row->get_tuple(j) == nullptr) {
                 // NULLs are encoded as -1
                 output_batch->tuple_offsets.push_back(-1);
@@ -397,7 +392,7 @@ size_t RowBatch::serialize(TRowBatch* output_batch) {
 
             // Record offset before creating copy (which increments offset and tuple_data)
             output_batch->tuple_offsets.push_back(offset);
-            row->get_tuple(j)->deep_copy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
+            row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
             DCHECK_LE(offset, size);
         }
     }
@@ -414,7 +409,7 @@ size_t RowBatch::serialize(TRowBatch* output_batch) {
         }
 
         size_t compressed_size = 0;
-        char* compressed_output = const_cast<char*>(_compression_scratch.c_str());
+        char* compressed_output = _compression_scratch.data();
         snappy::RawCompress(output_batch->tuple_data.c_str(), size, compressed_output,
                             &compressed_size);
 
@@ -450,20 +445,22 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
     // Copy tuple data, including strings, into output_batch (converting string
     // pointers into offsets in the process)
     int offset = 0; // current offset into output_batch->tuple_data
-    char* tuple_data = const_cast<char*>(mutable_tuple_data->data());
+    char* tuple_data = mutable_tuple_data->data();
+    const auto& tuple_descs = _row_desc.tuple_descriptors();
+    const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
+
     for (int i = 0; i < _num_rows; ++i) {
         TupleRow* row = get_row(i);
-        const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
-        std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-        for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
+        for (size_t j = 0; j < tuple_descs.size(); ++j) {
+            auto desc = tuple_descs[j];
             if (row->get_tuple(j) == nullptr) {
                 // NULLs are encoded as -1
-                output_batch->mutable_tuple_offsets()->Add(-1);
+                mutable_tuple_offsets->Add(-1);
                 continue;
             }
             // Record offset before creating copy (which increments offset and tuple_data)
-            output_batch->mutable_tuple_offsets()->Add(offset);
-            row->get_tuple(j)->deep_copy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
+            mutable_tuple_offsets->Add(offset);
+            row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
             DCHECK_LE(offset, size);
         }
     }
@@ -480,7 +477,7 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
         }
 
         size_t compressed_size = 0;
-        char* compressed_output = const_cast<char*>(_compression_scratch.c_str());
+        char* compressed_output = _compression_scratch.data();
         snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size);
 
         if (LIKELY(compressed_size < size)) {
@@ -506,12 +503,12 @@ void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) {
 
 Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size,
                                                   uint8_t** buffer) {
-    const int row_size = _row_desc.get_row_size();
+    int64_t row_size = _row_desc.get_row_size();
     // Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
     if (row_size != 0) {
-        _capacity = std::max(1, std::min(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
+        _capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
     }
-    *tuple_buffer_size = static_cast<int64_t>(row_size) * _capacity;
+    *tuple_buffer_size = row_size * _capacity;
     // TODO(dhc): change allocate to try_allocate?
     *buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
     if (*buffer == nullptr) {
@@ -560,7 +557,7 @@ void RowBatch::reset() {
     _blocks.clear();
     _auxiliary_mem_usage = 0;
     if (!config::enable_partitioned_aggregation) {
-        _tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
+        _tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size));
     }
     _need_to_return = false;
     _flush = FlushMode::NO_FLUSH_RESOURCES;
@@ -677,52 +674,48 @@ void RowBatch::deep_copy_to(RowBatch* dst) {
     dst->add_rows(_num_rows);
     for (int i = 0; i < _num_rows; ++i) {
         TupleRow* src_row = get_row(i);
-        TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
-        src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool,
-                           false);
+        TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
+        src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false);
     }
     dst->commit_rows(_num_rows);
 }
 // TODO: consider computing size of batches as they are built up
-size_t RowBatch::total_byte_size() {
+size_t RowBatch::total_byte_size() const {
     size_t result = 0;
 
     // Sum total variable length byte sizes.
     for (int i = 0; i < _num_rows; ++i) {
         TupleRow* row = get_row(i);
-        const std::vector<TupleDescriptor*>& tuple_descs = _row_desc.tuple_descriptors();
-        std::vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
-
-        for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
+        const auto& tuple_descs = _row_desc.tuple_descriptors();
+        for (size_t j = 0; j < tuple_descs.size(); ++j) {
+            auto desc = tuple_descs[j];
             Tuple* tuple = row->get_tuple(j);
             if (tuple == nullptr) {
                 continue;
             }
-            result += (*desc)->byte_size();
-            std::vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
-            for (; slot != (*desc)->string_slots().end(); ++slot) {
-                DCHECK((*slot)->type().is_string_type());
-                if (tuple->is_null((*slot)->null_indicator_offset())) {
+            result += desc->byte_size();
+
+            for (auto slot : desc->string_slots()) {
+                DCHECK(slot->type().is_string_type());
+                if (tuple->is_null(slot->null_indicator_offset())) {
                     continue;
                 }
-                StringValue* string_val = tuple->get_string_slot((*slot)->tuple_offset());
+                StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
                 result += string_val->len;
             }
 
             // compute slot collection size
-            vector<SlotDescriptor*>::const_iterator slot_collection =
-                    (*desc)->collection_slots().begin();
-            for (; slot_collection != (*desc)->collection_slots().end(); ++slot_collection) {
-                DCHECK((*slot_collection)->type().is_collection_type());
-                if (tuple->is_null((*slot_collection)->null_indicator_offset())) {
+            for (auto slot_collection : desc->collection_slots()) {
+                DCHECK(slot_collection->type().is_collection_type());
+                if (tuple->is_null(slot_collection->null_indicator_offset())) {
                     continue;
                 }
                 // compute data null_signs size
                 CollectionValue* array_val =
-                        tuple->get_collection_slot((*slot_collection)->tuple_offset());
+                        tuple->get_collection_slot(slot_collection->tuple_offset());
                 result += array_val->length() * sizeof(bool);
 
-                const TypeDescriptor& item_type = (*slot_collection)->type().children.at(0);
+                const TypeDescriptor& item_type = slot_collection->type().children.at(0);
                 result += array_val->length() * item_type.get_slot_size();
 
                 if (!item_type.is_string_type()) {
@@ -730,12 +723,12 @@ size_t RowBatch::total_byte_size() {
                 }
 
                 // compute string type item size
-                for (int i = 0; i < array_val->length(); ++i) {
-                    if (array_val->is_null_at(i)) {
+                for (int k = 0; k < array_val->length(); ++k) {
+                    if (array_val->is_null_at(k)) {
                         continue;
                     }
-                    StringValue* dst_item_v = reinterpret_cast<StringValue*>(
-                            (uint8_t*)array_val->data() + i * item_type.get_slot_size());
+                    StringValue* dst_item_v = convert_to<StringValue*>(
+                            (uint8_t*)array_val->data() + k * item_type.get_slot_size());
                     result += dst_item_v->len;
                 }
             }
@@ -745,20 +738,6 @@ size_t RowBatch::total_byte_size() {
     return result;
 }
 
-int RowBatch::max_tuple_buffer_size() const {
-    int row_size = _row_desc.get_row_size();
-    if (row_size > AT_CAPACITY_MEM_USAGE) {
-        return row_size;
-    }
-    int num_rows = 0;
-    if (row_size != 0) {
-        num_rows = std::min(_capacity, AT_CAPACITY_MEM_USAGE / row_size);
-    }
-    int tuple_buffer_size = num_rows * row_size;
-    DCHECK_LE(tuple_buffer_size, AT_CAPACITY_MEM_USAGE);
-    return tuple_buffer_size;
-}
-
 void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
                           FlushMode flush) {
     _auxiliary_mem_usage += buffer.len();
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 637c4b9..5fcae81 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -169,7 +169,7 @@ public:
 
     // The total size of all data represented in this row batch (tuples and referenced
     // string data).
-    size_t total_byte_size();
+    size_t total_byte_size() const;
 
     TupleRow* get_row(int row_idx) const {
         DCHECK(_tuple_ptrs != nullptr);
@@ -214,10 +214,10 @@ public:
         /// Returns true if the iterator is beyond the last row for read iterators.
         /// Useful for read iterators to determine the limit. Write iterators should use
         /// RowBatch::AtCapacity() instead.
-        bool IR_ALWAYS_INLINE at_end() { return _row >= _row_batch_end; }
+        bool IR_ALWAYS_INLINE at_end() const { return _row >= _row_batch_end; }
 
         /// Returns the row batch which this iterator is iterating through.
-        RowBatch* parent() { return _parent; }
+        RowBatch* parent() const { return _parent; }
 
     private:
         /// Number of tuples per row.
@@ -309,7 +309,7 @@ public:
     // we firstly update dest resource, and then reset current resource
     void transfer_resource_ownership(RowBatch* dest);
 
-    void copy_row(TupleRow* src, TupleRow* dest) {
+    void copy_row(const TupleRow* src, TupleRow* dest) const {
         memcpy(dest, src, _num_tuples_per_row * sizeof(Tuple*));
     }
 
@@ -385,9 +385,6 @@ public:
     void set_scanner_id(int id) { _scanner_id = id; }
     int scanner_id() const { return _scanner_id; }
 
-    // Computes the maximum size needed to store tuple data for this row batch.
-    int max_tuple_buffer_size() const;
-
     static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024;
     std::string to_string();
 
diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp
index 398a26c..9816f79 100644
--- a/be/src/runtime/tuple.cpp
+++ b/be/src/runtime/tuple.cpp
@@ -23,6 +23,7 @@
 #include <string>
 #include <vector>
 
+#include "common/utils.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
 #include "runtime/collection_value.h"
@@ -60,7 +61,7 @@ int64_t Tuple::varlen_byte_size(const TupleDescriptor& desc) const {
 }
 
 Tuple* Tuple::deep_copy(const TupleDescriptor& desc, MemPool* pool, bool convert_ptrs) {
-    Tuple* result = reinterpret_cast<Tuple*>(pool->allocate(desc.byte_size()));
+    Tuple* result = (Tuple*)(pool->allocate(desc.byte_size()));
     deep_copy(result, desc, pool, convert_ptrs);
     return result;
 }
@@ -69,17 +70,15 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
     memory_copy(dst, this, desc.byte_size());
 
     // allocate in the same pool and then copy all non-null string slots
-    for (std::vector<SlotDescriptor*>::const_iterator i = desc.string_slots().begin();
-         i != desc.string_slots().end(); ++i) {
-        DCHECK((*i)->type().is_string_type());
-
-        StringValue* string_v = dst->get_string_slot((*i)->tuple_offset());
-        if (!dst->is_null((*i)->null_indicator_offset())) {
+    for (auto string_slot : desc.string_slots()) {
+        DCHECK(string_slot->type().is_string_type());
+        StringValue* string_v = dst->get_string_slot(string_slot->tuple_offset());
+        if (!dst->is_null(string_slot->null_indicator_offset())) {
             if (string_v->len != 0) {
                 int offset = pool->total_allocated_bytes();
-                char* string_copy = reinterpret_cast<char*>(pool->allocate(string_v->len));
+                char* string_copy = (char*)(pool->allocate(string_v->len));
                 memory_copy(string_copy, string_v->ptr, string_v->len);
-                string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(offset) : string_copy);
+                string_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
             }
         } else {
             string_v->ptr = nullptr;
@@ -103,12 +102,12 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
         int nulls_size = cv->length() * sizeof(bool);
 
         int offset = pool->total_allocated_bytes();
-        char* coll_data = reinterpret_cast<char*>(pool->allocate(coll_byte_size + nulls_size));
+        char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size));
 
         // copy data and null_signs
         if (nulls_size > 0) {
             cv->set_has_null(true);
-            cv->set_null_signs(reinterpret_cast<bool*>(coll_data) + coll_byte_size);
+            cv->set_null_signs(convert_to<bool*>(coll_data) + coll_byte_size);
             memory_copy(coll_data, cv->null_signs(), nulls_size);
         } else {
             cv->set_has_null(false);
@@ -116,9 +115,8 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
         memory_copy(coll_data + nulls_size, cv->data(), coll_byte_size);
 
         // assgin new null_sign and data location
-        cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(offset)
-                                        : reinterpret_cast<bool*>(coll_data));
-        cv->set_data(convert_ptrs ? reinterpret_cast<char*>(offset + nulls_size)
+        cv->set_null_signs(convert_ptrs ? convert_to<bool*>(offset) : convert_to<bool*>(coll_data));
+        cv->set_data(convert_ptrs ? convert_to<char*>(offset + nulls_size)
                                   : coll_data + nulls_size);
 
         if (!item_type.is_string_type()) {
@@ -130,19 +128,19 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor& desc, MemPool* pool, bo
             if (cv->is_null_at(i)) {
                 continue;
             }
-            StringValue* dst_item_v = reinterpret_cast<StringValue*>(coll_data + item_offset);
+            StringValue* dst_item_v = convert_to<StringValue*>(coll_data + item_offset);
             if (dst_item_v->len != 0) {
                 int offset = pool->total_allocated_bytes();
-                char* string_copy = reinterpret_cast<char*>(pool->allocate(dst_item_v->len));
+                char* string_copy = (char*)(pool->allocate(dst_item_v->len));
                 memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
-                dst_item_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(offset) : string_copy);
+                dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) : string_copy);
             }
         }
     }
 }
 
 Tuple* Tuple::dcopy_with_new(const TupleDescriptor& desc, MemPool* pool, int64_t* bytes) {
-    Tuple* result = reinterpret_cast<Tuple*>(pool->allocate(desc.byte_size()));
+    Tuple* result = (Tuple*)(pool->allocate(desc.byte_size()));
     *bytes = dcopy_with_new(result, desc);
     return result;
 }
@@ -176,6 +174,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) {
         if (!is_null(slot->null_indicator_offset())) {
             StringValue* string_v = get_string_slot(slot->tuple_offset());
             delete[] string_v->ptr;
+            string_v->ptr = nullptr;
             bytes += string_v->len;
         }
     }
@@ -183,7 +182,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) {
 }
 
 void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, bool convert_ptrs) {
-    Tuple* dst = reinterpret_cast<Tuple*>(*data);
+    Tuple* dst = (Tuple*)(*data);
     memory_copy(dst, this, desc.byte_size());
     *data += desc.byte_size();
     *offset += desc.byte_size();
@@ -193,11 +192,11 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
         StringValue* string_v = dst->get_string_slot(slot_desc->tuple_offset());
         if (!dst->is_null(slot_desc->null_indicator_offset())) {
             memory_copy(*data, string_v->ptr, string_v->len);
-            string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
+            string_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data);
             *data += string_v->len;
             *offset += string_v->len;
         } else {
-            string_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
+            string_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data);
             string_v->len = 0;
         }
     }
@@ -221,9 +220,9 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
         memory_copy(*data + nulls_size, cv->data(), coll_byte_size);
 
         if (!item_type.is_string_type()) {
-            cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(*offset)
-                                            : reinterpret_cast<bool*>(*data));
-            cv->set_data(convert_ptrs ? reinterpret_cast<char*>(*offset + nulls_size)
+            cv->set_null_signs(convert_ptrs ? convert_to<bool*>(*offset)
+                                            : convert_to<bool*>(*data));
+            cv->set_data(convert_ptrs ? convert_to<char*>(*offset + nulls_size)
                                       : *data + nulls_size);
             *data += coll_byte_size + nulls_size;
             *offset += coll_byte_size + nulls_size;
@@ -242,18 +241,18 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset, boo
             if (cv->is_null_at(i)) {
                 continue;
             }
-            StringValue* dst_item_v = reinterpret_cast<StringValue*>(base_data + item_offset);
+            StringValue* dst_item_v = convert_to<StringValue*>(base_data + item_offset);
             if (dst_item_v->len != 0) {
                 memory_copy(*data, dst_item_v->ptr, dst_item_v->len);
-                dst_item_v->ptr = (convert_ptrs ? reinterpret_cast<char*>(*offset) : *data);
+                dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(*offset) : *data);
                 *data += dst_item_v->len;
                 *offset += dst_item_v->len;
             }
         }
         // assgin new null_sign and data location
-        cv->set_null_signs(convert_ptrs ? reinterpret_cast<bool*>(base_offset)
-                                        : reinterpret_cast<bool*>(base_data));
-        cv->set_data(convert_ptrs ? reinterpret_cast<char*>(base_offset + nulls_size)
+        cv->set_null_signs(convert_ptrs ? convert_to<bool*>(base_offset)
+                                        : convert_to<bool*>(base_data));
+        cv->set_data(convert_ptrs ? convert_to<char*>(base_offset + nulls_size)
                                   : base_data + nulls_size);
     }
 }
@@ -270,8 +269,9 @@ void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc,
     memset(this, 0, desc.num_null_bytes());
     // Evaluate the output_slot_exprs and place the results in the tuples.
     int mat_expr_index = 0;
-    for (int i = 0; i < desc.slots().size(); ++i) {
-        SlotDescriptor* slot_desc = desc.slots()[i];
+    auto& slots = desc.slots();
+    for (int i = 0; i < slots.size(); ++i) {
+        SlotDescriptor* slot_desc = slots[i];
         if (!slot_desc->is_materialized()) {
             continue;
         }
@@ -297,7 +297,7 @@ void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc,
             RawValue::write(src, dst, slot_desc->type(), pool);
             if (collect_string_vals) {
                 if (slot_desc->type().is_string_type()) {
-                    StringValue* string_val = reinterpret_cast<StringValue*>(dst);
+                    StringValue* string_val = convert_to<StringValue*>(dst);
                     non_null_var_len_values->push_back(string_val);
                     *total_var_len += string_val->len;
                 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org