You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/11 01:37:42 UTC
[doris] branch master updated: [refactor](remove row batch) remove impala rowbatch structure (#15767)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d857b4af1b [refactor](remove row batch) remove impala rowbatch structure (#15767)
d857b4af1b is described below
commit d857b4af1b42209c2715f3db33c88a6ee12ade1c
Author: yiguolei <67...@qq.com>
AuthorDate: Wed Jan 11 09:37:35 2023 +0800
[refactor](remove row batch) remove impala rowbatch structure (#15767)
* [refactor](remove row batch) remove impala rowbatch structure
Co-authored-by: yiguolei <yi...@gmail.com>
---
be/src/exec/exec_node.cpp | 1 -
be/src/exec/table_connector.h | 1 -
be/src/olap/delta_writer.cpp | 1 -
be/src/runtime/CMakeLists.txt | 3 +-
be/src/runtime/cache/result_cache.h | 1 -
be/src/runtime/cache/result_node.h | 1 -
be/src/runtime/plan_fragment_executor.cpp | 1 -
be/src/runtime/row_batch.cpp | 507 ---------------------
be/src/runtime/row_batch.h | 478 -------------------
be/src/runtime/tablets_channel.cpp | 1 -
be/src/util/arrow/block_convertor.h | 5 +-
be/src/util/arrow/row_batch.cpp | 344 --------------
be/src/util/arrow/row_batch.h | 17 -
be/src/vec/core/block.cpp | 1 -
.../vec/exec/data_gen_functions/vnumbers_tvf.cpp | 1 -
be/src/vec/exec/join/vnested_loop_join_node.cpp | 1 -
be/src/vec/exec/vaggregation_node.cpp | 1 -
be/src/vec/exec/vanalytic_eval_node.cpp | 1 -
be/src/vec/exec/vdata_gen_scan_node.cpp | 1 -
be/src/vec/exec/vmysql_scan_node.cpp | 1 -
be/src/vec/exec/vschema_scan_node.cpp | 1 -
be/src/vec/exec/vset_operation_node.h | 1 -
be/src/vec/exec/vsort_node.cpp | 1 -
be/src/vec/runtime/vsorted_run_merger.cpp | 1 -
be/src/vec/sink/vresult_file_sink.cpp | 1 -
be/src/vec/sink/vtablet_sink.cpp | 1 -
be/src/vec/sink/vtablet_sink.h | 1 -
be/test/exprs/binary_predicate_test.cpp | 1 -
be/test/exprs/in_op_test.cpp | 1 -
be/test/runtime/data_spliter_test.cpp | 1 -
be/test/runtime/fragment_mgr_test.cpp | 1 -
be/test/vec/core/block_test.cpp | 1 -
be/test/vec/exec/vjson_scanner_test.cpp | 1 -
be/test/vec/exec/vorc_scanner_test.cpp | 1 -
be/test/vec/exec/vparquet_scanner_test.cpp | 1 -
be/test/vec/exprs/vexpr_test.cpp | 27 --
36 files changed, 3 insertions(+), 1407 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index e29f5535f9..3a2d713403 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -31,7 +31,6 @@
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 9f5330a486..6ba0c26b8b 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -28,7 +28,6 @@
#include "common/status.h"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
#include "vec/exprs/vexpr_context.h"
namespace doris {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 22b7f568d1..ee17286b6b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -24,7 +24,6 @@
#include "olap/schema.h"
#include "olap/storage_engine.h"
#include "runtime/load_channel_mgr.h"
-#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 864dad3099..b2930ffb8a 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,8 +37,7 @@ set(RUNTIME_FILES
primitive_type.cpp
raw_value.cpp
result_buffer_mgr.cpp
- result_writer.cpp
- row_batch.cpp
+ result_writer.cpp
runtime_state.cpp
runtime_filter_mgr.cpp
runtime_predicate.cpp
diff --git a/be/src/runtime/cache/result_cache.h b/be/src/runtime/cache/result_cache.h
index de1a3711a6..0148bad786 100644
--- a/be/src/runtime/cache/result_cache.h
+++ b/be/src/runtime/cache/result_cache.h
@@ -32,7 +32,6 @@
#include "runtime/cache/cache_utils.h"
#include "runtime/cache/result_node.h"
#include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
namespace doris {
diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h
index 9b7b8a17a3..ca1b344d9f 100644
--- a/be/src/runtime/cache/result_node.h
+++ b/be/src/runtime/cache/result_node.h
@@ -34,7 +34,6 @@
#include "olap/olap_define.h"
#include "runtime/cache/cache_utils.h"
#include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "util/uid_util.h"
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 4ff1e99c32..f36cefaf51 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -32,7 +32,6 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/result_queue_mgr.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/thread_context.h"
#include "util/container_util.hpp"
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
deleted file mode 100644
index 7a86f922e9..0000000000
--- a/be/src/runtime/row_batch.cpp
+++ /dev/null
@@ -1,507 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.cc
-// and modified by Doris
-
-#include "runtime/row_batch.h"
-
-#include <snappy/snappy.h>
-#include <stdint.h> // for intptr_t
-
-#include "common/utils.h"
-#include "gen_cpp/Data_types.h"
-#include "gen_cpp/data.pb.h"
-#include "runtime/collection_value.h"
-#include "runtime/exec_env.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/thread_context.h"
-#include "runtime/tuple_row.h"
-#include "util/exception.h"
-#include "vec/columns/column_vector.h"
-#include "vec/core/block.h"
-
-using std::vector;
-
-namespace doris {
-
-const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
-const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
-
-RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity)
- : _has_in_flight_row(false),
- _num_rows(0),
- _num_uncommitted_rows(0),
- _capacity(capacity),
- _flush(FlushMode::NO_FLUSH_RESOURCES),
- _needs_deep_copy(false),
- _num_tuples_per_row(row_desc.tuple_descriptors().size()),
- _row_desc(row_desc),
- _auxiliary_mem_usage(0),
- _need_to_return(false),
- _tuple_data_pool() {
- DCHECK_GT(capacity, 0);
- _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
- DCHECK_GT(_tuple_ptrs_size, 0);
- _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
- DCHECK(_tuple_ptrs != nullptr);
-}
-
-// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
-// global runtime memory segment; how do we get thrift to allocate it from there?
-// maybe change line (in Data_types.cc generated from Data.thrift)
-// xfer += iprot->readString(this->tuple_data[_i9]);
-// to allocated string data in special mempool
-// (change via python script that runs over Data_types.cc)
-RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
- : _has_in_flight_row(false),
- _num_rows(input_batch.num_rows()),
- _num_uncommitted_rows(0),
- _capacity(_num_rows),
- _flush(FlushMode::NO_FLUSH_RESOURCES),
- _needs_deep_copy(false),
- _num_tuples_per_row(input_batch.row_tuples_size()),
- _row_desc(row_desc),
- _auxiliary_mem_usage(0),
- _need_to_return(false) {
- _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
- DCHECK_GT(_tuple_ptrs_size, 0);
- _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
- DCHECK(_tuple_ptrs != nullptr);
-
- char* tuple_data = nullptr;
- if (input_batch.is_compressed()) {
- // Decompress tuple data into data pool
- const char* compressed_data = input_batch.tuple_data().c_str();
- size_t compressed_size = input_batch.tuple_data().size();
- size_t uncompressed_size = 0;
- bool success =
- snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
- DCHECK(success) << "snappy::GetUncompressedLength failed";
- tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
- success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
- DCHECK(success) << "snappy::RawUncompress failed";
- } else {
- // Tuple data uncompressed, copy directly into data pool
- tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size());
- memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
- }
-
- // convert input_batch.tuple_offsets into pointers
- int tuple_idx = 0;
- // For historical reasons, the original offset was stored using int32,
- // so that if a rowbatch is larger than 2GB, the passed offset may generate an error due to value overflow.
- // So in the new version, a new_tuple_offsets structure is added to store offsets using int64.
- // Here, to maintain compatibility, both versions of offsets are used, with preference given to new_tuple_offsets.
- // TODO(cmy): in the next version, the original tuple_offsets should be removed.
- if (input_batch.new_tuple_offsets_size() > 0) {
- for (int64_t offset : input_batch.new_tuple_offsets()) {
- if (offset == -1) {
- _tuple_ptrs[tuple_idx++] = nullptr;
- } else {
- _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
- }
- }
- } else {
- for (int32_t offset : input_batch.tuple_offsets()) {
- if (offset == -1) {
- _tuple_ptrs[tuple_idx++] = nullptr;
- } else {
- _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
- }
- }
- }
-
- // Check whether we have slots that require offset-to-pointer conversion.
- if (!_row_desc.has_varlen_slots()) {
- return;
- }
-
- const auto& tuple_descs = _row_desc.tuple_descriptors();
-
- // For every unique tuple, convert string offsets contained in tuple data into
- // pointers. Tuples were serialized in the order we are deserializing them in,
- // so the first occurrence of a tuple will always have a higher offset than any tuple
- // we already converted.
- for (int i = 0; i < _num_rows; ++i) {
- TupleRow* row = get_row(i);
- for (size_t j = 0; j < tuple_descs.size(); ++j) {
- auto desc = tuple_descs[j];
- if (desc->string_slots().empty() && desc->collection_slots().empty()) {
- continue;
- }
-
- Tuple* tuple = row->get_tuple(j);
- if (tuple == nullptr) {
- continue;
- }
-
- for (auto slot : desc->string_slots()) {
- DCHECK(slot->type().is_string_type());
- if (tuple->is_null(slot->null_indicator_offset())) {
- continue;
- }
-
- StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
- int64_t offset = convert_to<int64_t>(string_val->ptr);
- string_val->ptr = tuple_data + offset;
-
- // Why we do this mask? Field len of StringValue is changed from int to size_t in
- // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
- // this works fine in version 0.10, however in 0.11 this will lead to an invalid
- // length. So we make the high bits zero here.
- string_val->len &= 0x7FFFFFFFL;
- }
-
- // copy collection slots
- for (auto slot_collection : desc->collection_slots()) {
- DCHECK(slot_collection->type().is_collection_type());
- if (tuple->is_null(slot_collection->null_indicator_offset())) {
- continue;
- }
-
- CollectionValue* array_val =
- tuple->get_collection_slot(slot_collection->tuple_offset());
- const auto& item_type_desc = slot_collection->type().children[0];
- CollectionValue::deserialize_collection(array_val, tuple_data, item_type_desc);
- }
- }
- }
-}
-
-void RowBatch::clear() {
- if (_cleared) {
- return;
- }
-
- _tuple_data_pool.free_all();
- _agg_object_pool.clear();
- for (int i = 0; i < _io_buffers.size(); ++i) {
- _io_buffers[i]->return_buffer();
- }
-
- for (BufferInfo& buffer_info : _buffers) {
- ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
- }
-
- DCHECK(_tuple_ptrs != nullptr);
- free(_tuple_ptrs);
- _tuple_ptrs = nullptr;
- _cleared = true;
-}
-
-RowBatch::~RowBatch() {
- clear();
-}
-
-static inline size_t align_tuple_offset(size_t offset) {
- if (config::rowbatch_align_tuple_offset) {
- return (offset + alignof(std::max_align_t) - 1) & (~(alignof(std::max_align_t) - 1));
- }
-
- return offset;
-}
-
-Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
- size_t* compressed_size, bool allow_transfer_large_data) {
- // num_rows
- output_batch->set_num_rows(_num_rows);
- // row_tuples
- _row_desc.to_protobuf(output_batch->mutable_row_tuples());
- // tuple_offsets: must clear before reserve
- // TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead.
- // keep tuple_offsets here is just for compatibility.
- output_batch->clear_tuple_offsets();
- output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
- output_batch->clear_new_tuple_offsets();
- output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
- // is_compressed
- output_batch->set_is_compressed(false);
- // tuple data
- size_t tuple_byte_size = total_byte_size();
- std::string* mutable_tuple_data = output_batch->mutable_tuple_data();
- mutable_tuple_data->resize(tuple_byte_size);
-
- // Copy tuple data, including strings, into output_batch (converting string
- // pointers into offsets in the process)
- int64_t offset = 0; // current offset into output_batch->tuple_data
- char* tuple_data = mutable_tuple_data->data();
- const auto& tuple_descs = _row_desc.tuple_descriptors();
- const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
- const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets();
-
- for (int i = 0; i < _num_rows; ++i) {
- TupleRow* row = get_row(i);
- for (size_t j = 0; j < tuple_descs.size(); ++j) {
- auto desc = tuple_descs[j];
- if (row->get_tuple(j) == nullptr) {
- // NULLs are encoded as -1
- mutable_tuple_offsets->Add(-1);
- mutable_new_tuple_offsets->Add(-1);
- continue;
- }
-
- int64_t old_offset = offset;
- offset = align_tuple_offset(offset);
- tuple_data += offset - old_offset;
-
- // Record offset before creating copy (which increments offset and tuple_data)
- mutable_tuple_offsets->Add((int32_t)offset);
- mutable_new_tuple_offsets->Add(offset);
- row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
- CHECK_GE(offset, 0);
- }
- }
- CHECK_EQ(offset, tuple_byte_size)
- << "offset: " << offset << " vs. tuple_byte_size: " << tuple_byte_size;
-
- size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
- bool can_compress = config::compress_rowbatches && tuple_byte_size > 0;
- if (can_compress) {
- try {
- // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
- _compression_scratch.resize(max_compressed_size);
- } catch (...) {
- can_compress = false;
- std::exception_ptr p = std::current_exception();
- LOG(WARNING) << "Try to alloc " << max_compressed_size
- << " bytes for compression scratch failed. "
- << get_current_exception_type_name(p);
- }
- }
- if (can_compress) {
- // Try compressing tuple_data to _compression_scratch, swap if compressed data is
- // smaller
- size_t compressed_size = 0;
- char* compressed_output = _compression_scratch.data();
- snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output,
- &compressed_size);
- if (LIKELY(compressed_size < tuple_byte_size)) {
- _compression_scratch.resize(compressed_size);
- mutable_tuple_data->swap(_compression_scratch);
- output_batch->set_is_compressed(true);
- }
-
- VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size
- << ", compressed size: " << compressed_size;
- }
-
- // return compressed and uncompressed size
- size_t pb_size = get_batch_size(*output_batch);
- *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
- *compressed_size = pb_size;
- if (!allow_transfer_large_data && pb_size > std::numeric_limits<int32_t>::max()) {
- // the protobuf has a hard limit of 2GB for serialized data.
- return Status::InternalError(
- "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size);
- }
- return Status::OK();
-}
-
-// when row from files can't fill into tuple with schema limitation, increase the _num_uncommitted_rows in row batch,
-void RowBatch::increase_uncommitted_rows() {
- _num_uncommitted_rows++;
-}
-
-void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) {
- DCHECK(buffer != nullptr);
- _io_buffers.push_back(buffer);
- _auxiliary_mem_usage += buffer->buffer_len();
-}
-
-Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size,
- uint8_t** buffer) {
- int64_t row_size = _row_desc.get_row_size();
- // Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
- if (row_size != 0) {
- _capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
- }
- *tuple_buffer_size = row_size * _capacity;
- // TODO(dhc): change allocate to try_allocate?
- *buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
- if (*buffer == nullptr) {
- std::stringstream ss;
- ss << "Failed to allocate tuple buffer" << *tuple_buffer_size;
- LOG(WARNING) << ss.str();
- return state->set_mem_limit_exceeded(ss.str());
- }
- return Status::OK();
-}
-
-void RowBatch::reset() {
- _num_rows = 0;
- _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
- _has_in_flight_row = false;
-
- // TODO: Change this to Clear() and investigate the repercussions.
- _tuple_data_pool.free_all();
- _agg_object_pool.clear();
- for (int i = 0; i < _io_buffers.size(); ++i) {
- _io_buffers[i]->return_buffer();
- }
- _io_buffers.clear();
-
- for (BufferInfo& buffer_info : _buffers) {
- ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
- }
- _buffers.clear();
-
- _auxiliary_mem_usage = 0;
- _need_to_return = false;
- _flush = FlushMode::NO_FLUSH_RESOURCES;
- _needs_deep_copy = false;
-}
-
-void RowBatch::transfer_resource_ownership(RowBatch* dest) {
- dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes();
- dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false);
- dest->_agg_object_pool.acquire_data(&_agg_object_pool);
- for (int i = 0; i < _io_buffers.size(); ++i) {
- DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
- dest->_io_buffers.push_back(buffer);
- dest->_auxiliary_mem_usage += buffer->buffer_len();
- }
- _io_buffers.clear();
-
- for (BufferInfo& buffer_info : _buffers) {
- dest->add_buffer(buffer_info.client, std::move(buffer_info.buffer),
- FlushMode::NO_FLUSH_RESOURCES);
- }
- _buffers.clear();
-
- dest->_need_to_return |= _need_to_return;
-
- if (_needs_deep_copy) {
- dest->mark_needs_deep_copy();
- } else if (_flush == FlushMode::FLUSH_RESOURCES) {
- dest->mark_flush_resources();
- }
- reset();
-}
-
-size_t RowBatch::get_batch_size(const PRowBatch& batch) {
- size_t result = batch.tuple_data().size();
- result += batch.row_tuples().size() * sizeof(int32_t);
- // TODO(cmy): remove batch.tuple_offsets
- result += batch.tuple_offsets().size() * sizeof(int32_t);
- result += batch.new_tuple_offsets().size() * sizeof(int64_t);
- return result;
-}
-
-void RowBatch::acquire_state(RowBatch* src) {
- // DCHECK(_row_desc.equals(src->_row_desc));
- DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row);
- // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
- DCHECK_EQ(_auxiliary_mem_usage, 0);
-
- // The destination row batch should be empty.
- DCHECK(!_has_in_flight_row);
- DCHECK_EQ(_num_rows, 0);
-
- for (int i = 0; i < src->_io_buffers.size(); ++i) {
- DiskIoMgr::BufferDescriptor* buffer = src->_io_buffers[i];
- _io_buffers.push_back(buffer);
- _auxiliary_mem_usage += buffer->buffer_len();
- }
- src->_io_buffers.clear();
- src->_auxiliary_mem_usage = 0;
-
- _has_in_flight_row = src->_has_in_flight_row;
- _num_rows = src->_num_rows;
- _capacity = src->_capacity;
- _need_to_return = src->_need_to_return;
- // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
- std::swap(_tuple_ptrs, src->_tuple_ptrs);
- src->transfer_resource_ownership(this);
-}
-
-void RowBatch::deep_copy_to(RowBatch* dst) {
- DCHECK(dst->_row_desc.equals(_row_desc));
- DCHECK_EQ(dst->_num_rows, 0);
- DCHECK_GE(dst->_capacity, _num_rows);
- dst->add_rows(_num_rows);
- for (int i = 0; i < _num_rows; ++i) {
- TupleRow* src_row = get_row(i);
- TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
- src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false);
- }
- dst->commit_rows(_num_rows);
-}
-
-// TODO: consider computing size of batches as they are built up
-size_t RowBatch::total_byte_size() const {
- size_t result = 0;
-
- // Sum total variable length byte sizes.
- for (int i = 0; i < _num_rows; ++i) {
- TupleRow* row = get_row(i);
- const auto& tuple_descs = _row_desc.tuple_descriptors();
- for (size_t j = 0; j < tuple_descs.size(); ++j) {
- auto desc = tuple_descs[j];
- Tuple* tuple = row->get_tuple(j);
- if (tuple == nullptr) {
- continue;
- }
- result = align_tuple_offset(result);
- result += desc->byte_size();
-
- for (auto slot : desc->string_slots()) {
- DCHECK(slot->type().is_string_type());
- if (tuple->is_null(slot->null_indicator_offset())) {
- continue;
- }
- StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
- result += string_val->len;
- }
-
- // compute slot collection size
- for (auto slot_collection : desc->collection_slots()) {
- DCHECK(slot_collection->type().is_collection_type());
- if (tuple->is_null(slot_collection->null_indicator_offset())) {
- continue;
- }
- CollectionValue* array_val =
- tuple->get_collection_slot(slot_collection->tuple_offset());
- const auto& item_type_desc = slot_collection->type().children[0];
- result += array_val->get_byte_size(item_type_desc);
- }
- }
- }
-
- return result;
-}
-
-void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
- FlushMode flush) {
- _auxiliary_mem_usage += buffer.len();
- BufferInfo buffer_info;
- buffer_info.client = client;
- buffer_info.buffer = std::move(buffer);
- _buffers.push_back(std::move(buffer_info));
- if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources();
-}
-
-std::string RowBatch::to_string() {
- std::stringstream out;
- for (int i = 0; i < _num_rows; ++i) {
- out << get_row(i)->to_string(_row_desc) << "\n";
- }
- return out.str();
-}
-
-} // end namespace doris
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
deleted file mode 100644
index bf920e1905..0000000000
--- a/be/src/runtime/row_batch.h
+++ /dev/null
@@ -1,478 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.h
-// and modified by Doris
-
-#pragma once
-
-#include <cstring>
-#include <vector>
-
-#include "common/logging.h"
-#include "runtime/bufferpool/buffer_pool.h"
-#include "runtime/descriptors.h"
-#include "runtime/disk_io_mgr.h"
-#include "runtime/mem_pool.h"
-
-namespace doris::vectorized {
-class Block;
-}
-
-namespace doris {
-
-class BufferedTupleStream2;
-class Tuple;
-class TupleRow;
-class TupleDescriptor;
-class PRowBatch;
-
-// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
-// The maximum number of rows is fixed at the time of construction, and the caller
-// can add rows up to that capacity.
-// The row batch reference a few different sources of memory.
-// 1. TupleRow ptrs - this is always owned and managed by the row batch.
-// 2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool.
-// 3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally
-// (don't copy strings) or from the tuple pool (strings are copied). If external,
-// the data is in an io buffer that may not be attached to this row batch. The
-// creator of that row batch has to make sure that the io buffer is not recycled
-// until all batches that reference the memory have been consumed.
-// In order to minimize memory allocations, RowBatches and PRowBatches that have been
-// serialized and sent over the wire should be reused (this prevents _compression_scratch
-// from being needlessly reallocated).
-//
-// Row batches and memory usage: We attempt to stream row batches through the plan
-// tree without copying the data. This means that row batches are often not-compact
-// and reference memory outside of the row batch. This results in most row batches
-// having a very small memory footprint and in some row batches having a very large
-// one (it contains all the memory that other row batches are referencing). An example
-// is IoBuffers which are only attached to one row batch. Only when the row batch reaches
-// a blocking operator or the root of the fragment is the row batch memory freed.
-// This means that in some cases (e.g. very selective queries), we still need to
-// pass the row batch through the exec nodes (even if they have no rows) to trigger
-// memory deletion. at_capacity() encapsulates the check that we are not accumulating
-// excessive memory.
-//
-// A row batch is considered at capacity if all the rows are full or it has accumulated
-// auxiliary memory up to a soft cap. (See _at_capacity_mem_usage comment).
-// TODO: stick _tuple_ptrs into a pool?
-class RowBatch {
-public:
- /// Flag indicating whether the resources attached to a RowBatch need to be flushed.
- /// Defined here as a convenience for other modules that need to communicate flushing
- /// modes.
- enum class FlushMode {
- FLUSH_RESOURCES,
- NO_FLUSH_RESOURCES,
- };
-
- // Create RowBatch for a maximum of 'capacity' rows of tuples specified
- // by 'row_desc'.
- RowBatch(const RowDescriptor& row_desc, int capacity);
-
- // Populate a row batch from input_batch by copying input_batch's
- // tuple_data into the row batch's mempool and converting all offsets
- // in the data back into pointers.
- // TODO: figure out how to transfer the data from input_batch to this RowBatch
- // (so that we don't need to make yet another copy)
- RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch);
-
- // Releases all resources accumulated at this row batch. This includes
- // - tuple_ptrs
- // - tuple mem pool data
- // - buffer handles from the io mgr
- virtual ~RowBatch();
-
- // used to c
- void clear();
-
- static const int INVALID_ROW_INDEX = -1;
-
- // Add n rows of tuple pointers after the last committed row and return its index.
- // The rows are uninitialized and each tuple of the row must be set.
- // Returns INVALID_ROW_INDEX if the row batch cannot fit n rows.
- // Two consecutive add_row() calls without a commit_last_row() between them
- // have the same effect as a single call.
- int add_rows(int n) {
- if (_num_rows + n > _capacity) {
- return INVALID_ROW_INDEX;
- }
-
- _has_in_flight_row = true;
- return _num_rows;
- }
-
- int add_row() { return add_rows(1); }
-
- void commit_rows(int n) {
- DCHECK_GE(n, 0);
- DCHECK_LE(_num_rows + n, _capacity);
- _num_rows += n;
- _has_in_flight_row = false;
- }
-
- void commit_last_row() { commit_rows(1); }
-
- bool in_flight() const { return _has_in_flight_row; }
-
- // Set function can be used to reduce the number of rows in the batch. This is only
- // used in the limit case where more rows were added than necessary.
- void set_num_rows(int num_rows) {
- DCHECK_LE(num_rows, _num_rows);
- _num_rows = num_rows;
- }
-
- // Returns true if the row batch has filled all the rows or has accumulated
- // enough memory.
- bool at_capacity() const {
- return _num_rows == _capacity || _auxiliary_mem_usage >= AT_CAPACITY_MEM_USAGE ||
- _need_to_return;
- }
-
- // Returns true if the row batch has filled all the rows or has accumulated
- // enough memory. tuple_pool is an intermediate memory pool containing tuple data
- // that will eventually be attached to this row batch. We need to make sure
- // the tuple pool does not accumulate excessive memory.
- bool at_capacity(const MemPool* tuple_pool) const {
- DCHECK(tuple_pool != nullptr);
- return at_capacity() || tuple_pool->total_allocated_bytes() > AT_CAPACITY_MEM_USAGE;
- }
-
- // Returns true if row_batch has reached capacity.
- bool is_full() const { return _num_rows == _capacity; }
-
- // Returns true if uncommitted rows has reached capacity.
- bool is_full_uncommitted() { return _num_uncommitted_rows == _capacity; }
-
- // Returns true if the row batch has accumulated enough external memory (in MemPools
- // and io buffers). This would be a trigger to compact the row batch or reclaim
- // the memory in some way.
- bool at_resource_limit() {
- return tuple_data_pool()->total_allocated_bytes() > MAX_MEM_POOL_SIZE;
- }
-
- // The total size of all data represented in this row batch (tuples and referenced
- // string data).
- size_t total_byte_size() const;
-
- TupleRow* get_row(int row_idx) const {
- DCHECK(_tuple_ptrs != nullptr);
- DCHECK_GE(row_idx, 0);
- //DCHECK_LT(row_idx, _num_rows + (_has_in_flight_row ? 1 : 0));
- return reinterpret_cast<TupleRow*>(_tuple_ptrs + row_idx * _num_tuples_per_row);
- }
-
- /// An iterator for going through a row batch, starting at 'row_idx'.
- /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit'
- /// or the last row, whichever comes first. Otherwise, it will iterate till the last
- /// row in the batch. This is more efficient than using GetRow() as it avoids loading
- /// the row batch state and doing multiplication on each loop with GetRow().
- class Iterator {
- public:
- Iterator(RowBatch* parent, int row_idx, int limit = -1)
- : _num_tuples_per_row(parent->_num_tuples_per_row),
- _row(parent->_tuple_ptrs + _num_tuples_per_row * row_idx),
- _row_batch_end(parent->_tuple_ptrs +
- _num_tuples_per_row *
- (limit == -1 ? parent->_num_rows
- : std::min<int>(row_idx + limit,
- parent->_num_rows))),
- _parent(parent) {
- DCHECK_GE(row_idx, 0);
- DCHECK_GT(_num_tuples_per_row, 0);
- /// We allow empty row batches with _num_rows == capacity_ == 0.
- /// That's why we cannot call GetRow() above to initialize '_row'.
- DCHECK_LE(row_idx, parent->_capacity);
- }
-
- /// Return the current row pointed to by the row pointer.
- TupleRow* get() { return reinterpret_cast<TupleRow*>(_row); }
-
- /// Increment the row pointer and return the next row.
- TupleRow* next() {
- _row += _num_tuples_per_row;
- DCHECK_LE((_row - _parent->_tuple_ptrs) / _num_tuples_per_row, _parent->_capacity);
- return get();
- }
-
- /// Returns true if the iterator is beyond the last row for read iterators.
- /// Useful for read iterators to determine the limit. Write iterators should use
- /// RowBatch::AtCapacity() instead.
- bool at_end() const { return _row >= _row_batch_end; }
-
- /// Returns the row batch which this iterator is iterating through.
- RowBatch* parent() const { return _parent; }
-
- private:
- /// Number of tuples per row.
- const int _num_tuples_per_row;
-
- /// Pointer to the current row.
- Tuple** _row;
-
- /// Pointer to the row after the last row for read iterators.
- Tuple** const _row_batch_end;
-
- /// The row batch being iterated on.
- RowBatch* const _parent;
- };
-
- int num_tuples_per_row() const { return _num_tuples_per_row; }
- int row_byte_size() const { return _num_tuples_per_row * sizeof(Tuple*); }
- MemPool* tuple_data_pool() { return &_tuple_data_pool; }
- ObjectPool* agg_object_pool() { return &_agg_object_pool; }
- int num_io_buffers() const { return _io_buffers.size(); }
-
- // increase # of uncommitted rows
- void increase_uncommitted_rows();
-
- // Resets the row batch, returning all resources it has accumulated.
- void reset();
-
- // Add io buffer to this row batch.
- void add_io_buffer(DiskIoMgr::BufferDescriptor* buffer);
-
- // Add tuple stream to this row batch. The row batch takes ownership of the stream
- // and will call Close() on the stream and delete it when freeing resources.
- void add_tuple_stream(BufferedTupleStream2* stream);
-
- /// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
- /// The buffer's memory remains accounted against the original owner, even when the
- /// ownership of batches is transferred. If the original owner wants the memory to be
- /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources()
- /// for further explanation).
- /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and
- /// make it consistent between buffers and I/O buffers.
- void add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
- FlushMode flush);
-
- // Called to indicate this row batch must be returned up the operator tree.
- // This is used to control memory management for streaming rows.
- // TODO: consider using this mechanism instead of add_io_buffer/add_tuple_stream. This is
- // the property we need rather than meticulously passing resources up so the operator
- // tree.
- void mark_need_to_return() { _need_to_return = true; }
-
- bool need_to_return() const { return _need_to_return; }
-
- /// Used by an operator to indicate that it cannot produce more rows until the
- /// resources that it has attached to the row batch are freed or acquired by an
- /// ancestor operator. After this is called, the batch is at capacity and no more rows
- /// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This
- /// ensures that batches are flushed by streaming operators all the way up the operator
- /// tree. Blocking operators can still accumulate batches with this flag.
- /// TODO: IMPALA-3200: blocking operators should acquire all memory resources including
- /// attached blocks/buffers, so that MarkFlushResources() can guarantee that the
- /// resources will not be accounted against the original operator (this is currently
- /// not true for Blocks, which can't be transferred).
- void mark_flush_resources() {
- DCHECK_LE(_num_rows, _capacity);
- _capacity = _num_rows;
- _flush = FlushMode::FLUSH_RESOURCES;
- }
-
- /// Called to indicate that some resources backing this batch were not attached and
- /// will be cleaned up after the next GetNext() call. This means that the batch must
- /// be returned up the operator tree. Blocking operators must deep-copy any rows from
- /// this batch or preceding batches.
- ///
- /// This is a stronger version of MarkFlushResources(), because blocking operators
- /// are not allowed to accumulate batches with the 'needs_deep_copy' flag.
- /// TODO: IMPALA-4179: always attach backing resources and remove this flag.
- void mark_needs_deep_copy() {
- mark_flush_resources(); // No more rows should be added to the batch.
- _needs_deep_copy = true;
- }
-
- bool needs_deep_copy() const { return _needs_deep_copy; }
-
- // Transfer ownership of resources to dest. This includes tuple data in mem
- // pool and io buffers.
- // we firstly update dest resource, and then reset current resource
- void transfer_resource_ownership(RowBatch* dest);
-
- void copy_row(TupleRow* src, TupleRow* dest) {
- memcpy(dest, src, _num_tuples_per_row * sizeof(Tuple*));
- }
-
- // Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for exec
- // nodes that skip an offset and copied more than necessary.
- void copy_rows(int dest, int src, int num_rows) {
- DCHECK_LE(dest, src);
- DCHECK_LE(src + num_rows, _capacity);
- memmove(_tuple_ptrs + _num_tuples_per_row * dest, _tuple_ptrs + _num_tuples_per_row * src,
- num_rows * _num_tuples_per_row * sizeof(Tuple*));
- }
-
- void clear_row(TupleRow* row) { memset(row, 0, _num_tuples_per_row * sizeof(Tuple*)); }
-
- // Acquires state from the 'src' row batch into this row batch. This includes all IO
- // buffers and tuple data.
- // This row batch must be empty and have the same row descriptor as the src batch.
- // This is used for scan nodes which produce RowBatches asynchronously. Typically,
- // an ExecNode is handed a row batch to populate (pull model) but ScanNodes have
- // multiple threads which push row batches.
- // TODO: this is wasteful and makes a copy that's unnecessary. Think about cleaning
- // this up.
- // TODO: rename this or unify with TransferResourceOwnership()
- void acquire_state(RowBatch* src);
-
- // Deep copy all rows this row batch into dst, using memory allocated from
- // dst's _tuple_data_pool. Only valid when dst is empty.
- // TODO: the current implementation of deep copy can produce an oversized
- // row batch if there are duplicate tuples in this row batch.
- void deep_copy_to(RowBatch* dst);
-
- // Create a serialized version of this row batch in output_batch, attaching all of the
- // data it references to output_batch.tuple_data. output_batch.tuple_data will be
- // snappy-compressed unless the compressed data is larger than the uncompressed
- // data. Use output_batch.is_compressed to determine whether tuple_data is compressed.
- // If an in-flight row is present in this row batch, it is ignored.
- // This function does not reset().
- // Returns the uncompressed serialized size (this will be the true size of output_batch
- // if tuple_data is actually uncompressed).
- Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size,
- bool allow_transfer_large_data = false);
-
- // Utility function: returns total size of batch.
- static size_t get_batch_size(const PRowBatch& batch);
-
- int num_rows() const { return _num_rows; }
- int capacity() const { return _capacity; }
-
- int num_buffers() const { return _buffers.size(); }
-
- const RowDescriptor& row_desc() const { return _row_desc; }
-
- // Max memory that this row batch can accumulate in _tuple_data_pool before it
- // is considered at capacity.
- /// This is a soft capacity: row batches may exceed the capacity, preferably only by a
- /// row's worth of data.
- static const int AT_CAPACITY_MEM_USAGE;
-
- // Max memory out of AT_CAPACITY_MEM_USAGE that should be used for fixed-length data,
- // in order to leave room for variable-length data.
- static const int FIXED_LEN_BUFFER_LIMIT;
-
- /// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in
- /// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would
- /// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row.
- /// Returns Status::MemoryLimitExceeded("Memory limit exceeded") and sets 'buffer' to nullptr if a memory limit would
- /// have been exceeded. 'state' is used to log the error.
- /// On success, sets 'buffer_size' to the size in bytes and 'buffer' to the buffer.
- Status resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* buffer_size,
- uint8_t** buffer);
-
- void set_scanner_id(int id) { _scanner_id = id; }
- int scanner_id() const { return _scanner_id; }
-
- static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024;
- std::string to_string();
-
-private:
- // All members need to be handled in RowBatch::swap()
-
- bool _has_in_flight_row; // if true, last row hasn't been committed yet
- int _num_rows; // # of committed rows
- int _num_uncommitted_rows; // # of uncommited rows in row batch mem pool
- int _capacity; // maximum # of rows
-
- /// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
- /// acquired by a new owner as soon as possible. See MarkFlushResources(). If
- /// FLUSH_RESOURCES, AtCapacity() is also true.
- FlushMode _flush;
-
- /// If true, this batch references unowned memory that will be cleaned up soon.
- /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
- /// AtCapacity() is true.
- bool _needs_deep_copy;
-
- int _num_tuples_per_row;
- RowDescriptor _row_desc;
-
- // Array of pointers with _capacity * _num_tuples_per_row elements.
- // The memory ownership depends on whether legacy joins and aggs are enabled.
- //
- // Memory is malloc'd and owned by RowBatch:
- // If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true
- // then the memory is owned by this RowBatch and is freed upon its destruction.
- // This mode is more performant especially with SubplanNodes in the ExecNode tree
- // because the tuple pointers are not transferred and do not have to be re-created
- // in every Reset().
- //
- // Memory is allocated from MemPool:
- // Otherwise, the memory is allocated from _tuple_data_pool. As a result, the
- // pointer memory is transferred just like tuple data, and must be re-created
- // in Reset(). This mode is required for the legacy join and agg which rely on
- // the tuple pointers being allocated from the _tuple_data_pool, so they can
- // acquire ownership of the tuple pointers.
- Tuple** _tuple_ptrs;
- int _tuple_ptrs_size;
-
- // Sum of all auxiliary bytes. This includes IoBuffers and memory from
- // TransferResourceOwnership().
- int64_t _auxiliary_mem_usage;
-
- // If true, this batch is considered at capacity. This is explicitly set by streaming
- // components that return rows via row batches.
- bool _need_to_return;
-
- // holding (some of the) data referenced by rows
- MemPool _tuple_data_pool;
-
- // holding some complex agg object data (bitmap, hll)
- ObjectPool _agg_object_pool;
-
- // IO buffers current owned by this row batch. Ownership of IO buffers transfer
- // between row batches. Any IO buffer will be owned by at most one row batch
- // (i.e. they are not ref counted) so most row batches don't own any.
- std::vector<DiskIoMgr::BufferDescriptor*> _io_buffers;
-
- struct BufferInfo {
- BufferPool::ClientHandle* client;
- BufferPool::BufferHandle buffer;
- };
- /// Pages attached to this row batch. See AddBuffer() for ownership semantics.
- std::vector<BufferInfo> _buffers;
-
- // String to write compressed tuple data to in serialize().
- // This is a string so we can swap() with the string in the PRowBatch we're serializing
- // to (we don't compress directly into the PRowBatch in case the compressed data is
- // longer than the uncompressed data). Swapping avoids copying data to the PRowBatch and
- // avoids excess memory allocations: since we reuse RowBatches and PRowBatchs, and
- // assuming all row batches are roughly the same size, all strings will eventually be
- // allocated to the right size.
- std::string _compression_scratch;
-
- int _scanner_id;
- bool _cleared = false;
-};
-
-/// Macros for iterating through '_row_batch', starting at '_start_row_idx'.
-/// '_row_batch' is the row batch to iterate through.
-/// '_start_row_idx' is the starting row index.
-/// '_iter' is the iterator.
-/// '_limit' is the max number of rows to iterate over.
-#define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \
- for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); !_iter.at_end(); _iter.next())
-
-#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter) \
- for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit); !_iter.at_end(); \
- _iter.next())
-
-} // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 089b5ba4be..23045b997a 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -22,7 +22,6 @@
#include "olap/memtable.h"
#include "olap/storage_engine.h"
#include "runtime/load_channel.h"
-#include "runtime/row_batch.h"
#include "util/doris_metrics.h"
namespace doris {
diff --git a/be/src/util/arrow/block_convertor.h b/be/src/util/arrow/block_convertor.h
index bc4aae037e..c76ae73c9f 100644
--- a/be/src/util/arrow/block_convertor.h
+++ b/be/src/util/arrow/block_convertor.h
@@ -22,8 +22,8 @@
#include "common/status.h"
#include "vec/core/block.h"
-// This file will convert Doris RowBatch to/from Arrow's RecordBatch
-// RowBatch is used by Doris query engine to exchange data between
+// This file will convert Doris Block to/from Arrow's RecordBatch
+// Block is used by Doris query engine to exchange data between
// each execute node.
namespace arrow {
@@ -37,7 +37,6 @@ class Schema;
namespace doris {
class ObjectPool;
-class RowBatch;
class RowDescriptor;
Status convert_to_arrow_batch(const vectorized::Block& block,
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index 46eca71b6a..01b930b52e 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -40,7 +40,6 @@
#include "runtime/descriptor_helper.h"
#include "runtime/descriptors.h"
#include "runtime/large_int_value.h"
-#include "runtime/row_batch.h"
#include "util/arrow/utils.h"
#include "util/types.h"
@@ -120,349 +119,6 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
return Status::OK();
}
-Status convert_to_doris_type(const arrow::DataType& type, TSlotDescriptorBuilder* builder) {
- switch (type.id()) {
- case arrow::Type::INT8:
- builder->type(TYPE_TINYINT);
- break;
- case arrow::Type::INT16:
- builder->type(TYPE_SMALLINT);
- break;
- case arrow::Type::INT32:
- builder->type(TYPE_INT);
- break;
- case arrow::Type::INT64:
- builder->type(TYPE_BIGINT);
- break;
- case arrow::Type::FLOAT:
- builder->type(TYPE_FLOAT);
- break;
- case arrow::Type::DOUBLE:
- builder->type(TYPE_DOUBLE);
- break;
- case arrow::Type::BOOL:
- builder->type(TYPE_BOOLEAN);
- break;
- default:
- return Status::InvalidArgument("Unknown arrow type id({})", type.id());
- }
- return Status::OK();
-}
-
-Status convert_to_slot_desc(const arrow::Field& field, int column_pos,
- TSlotDescriptorBuilder* builder) {
- RETURN_IF_ERROR(convert_to_doris_type(*field.type(), builder));
- builder->column_name(field.name()).nullable(field.nullable()).column_pos(column_pos);
- return Status::OK();
-}
-
-Status convert_to_row_desc(ObjectPool* pool, const arrow::Schema& schema,
- RowDescriptor** row_desc) {
- TDescriptorTableBuilder builder;
- TTupleDescriptorBuilder tuple_builder;
- for (int i = 0; i < schema.num_fields(); ++i) {
- auto field = schema.field(i);
- TSlotDescriptorBuilder slot_builder;
- RETURN_IF_ERROR(convert_to_slot_desc(*field, i, &slot_builder));
- tuple_builder.add_slot(slot_builder.build());
- }
- tuple_builder.build(&builder);
- DescriptorTbl* tbl = nullptr;
- RETURN_IF_ERROR(DescriptorTbl::create(pool, builder.desc_tbl(), &tbl));
- auto tuple_desc = tbl->get_tuple_descriptor(0);
- *row_desc = pool->add(new RowDescriptor(tuple_desc, false));
- return Status::OK();
-}
-
-// Convert RowBatch to an Arrow::Array
-// We should keep this function to keep compatible with arrow's type visitor
-// Now we inherit TypeVisitor to use default Visit implementation
-class FromRowBatchConverter : public arrow::TypeVisitor {
-public:
- FromRowBatchConverter(const RowBatch& batch, const std::shared_ptr<arrow::Schema>& schema,
- arrow::MemoryPool* pool)
- : _batch(batch), _schema(schema), _pool(pool), _cur_field_idx(-1) {
- // obtain local time zone
- time_t ts = 0;
- struct tm t;
- char buf[16];
- localtime_r(&ts, &t);
- strftime(buf, sizeof(buf), "%Z", &t);
- _time_zone = buf;
- }
-
- ~FromRowBatchConverter() override {}
-
- // Use base class function
- using arrow::TypeVisitor::Visit;
-
-#define PRIMITIVE_VISIT(TYPE) \
- arrow::Status Visit(const arrow::TYPE& type) override { return _visit(type); }
-
- PRIMITIVE_VISIT(Int8Type);
- PRIMITIVE_VISIT(Int16Type);
- PRIMITIVE_VISIT(Int32Type);
- PRIMITIVE_VISIT(Int64Type);
- PRIMITIVE_VISIT(FloatType);
- PRIMITIVE_VISIT(DoubleType);
-
-#undef PRIMITIVE_VISIT
-
- // process string-transformable field
- arrow::Status Visit(const arrow::StringType& type) override {
- arrow::StringBuilder builder(_pool);
- size_t num_rows = _batch.num_rows();
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
- bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
- PrimitiveType primitive_type = _cur_slot_ref->type().type;
- switch (primitive_type) {
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_HLL:
- case TYPE_STRING: {
- const StringValue* string_val = (const StringValue*)(cell_ptr);
- if (string_val->len == 0) {
- // 0x01 is a magic num, not useful actually, just for present ""
- //char* tmp_val = reinterpret_cast<char*>(0x01);
- ARROW_RETURN_NOT_OK(builder.Append(""));
- } else {
- ARROW_RETURN_NOT_OK(builder.Append(string_val->ptr, string_val->len));
- }
- break;
- }
- case TYPE_DATE:
- case TYPE_DATETIME: {
- char buf[64];
- const DateTimeValue* time_val = (const DateTimeValue*)(cell_ptr);
- int len = time_val->to_buffer(buf);
- ARROW_RETURN_NOT_OK(builder.Append(buf, len));
- break;
- }
- case TYPE_LARGEINT: {
- auto string_temp = LargeIntValue::to_string(
- reinterpret_cast<const PackedInt128*>(cell_ptr)->value);
- ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), string_temp.size()));
- break;
- }
- default: {
- LOG(WARNING) << "can't convert this type = " << primitive_type << "to arrow type";
- return arrow::Status::TypeError("unsupported column type");
- }
- }
- }
- return builder.Finish(&_arrays[_cur_field_idx]);
- }
-
- // process doris DecimalV2
- arrow::Status Visit(const arrow::Decimal128Type& type) override {
- std::shared_ptr<arrow::DataType> s_decimal_ptr =
- std::make_shared<arrow::Decimal128Type>(27, 9);
- arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
- size_t num_rows = _batch.num_rows();
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
- bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
- PackedInt128* p_value = reinterpret_cast<PackedInt128*>(cell_ptr);
- int64_t high = (p_value->value) >> 64;
- uint64 low = p_value->value;
- arrow::Decimal128 value(high, low);
- ARROW_RETURN_NOT_OK(builder.Append(value));
- }
- return builder.Finish(&_arrays[_cur_field_idx]);
- }
- // process boolean
- arrow::Status Visit(const arrow::BooleanType& type) override {
- arrow::BooleanBuilder builder(_pool);
- size_t num_rows = _batch.num_rows();
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
- bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
- ARROW_RETURN_NOT_OK(builder.Append(*(bool*)cell_ptr));
- }
- return builder.Finish(&_arrays[_cur_field_idx]);
- }
-
- Status convert(std::shared_ptr<arrow::RecordBatch>* out);
-
-private:
- template <typename T>
- typename std::enable_if<std::is_base_of<arrow::PrimitiveCType, T>::value, arrow::Status>::type
- _visit(const T& type) {
- arrow::NumericBuilder<T> builder(_pool);
-
- size_t num_rows = _batch.num_rows();
- ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
- for (size_t i = 0; i < num_rows; ++i) {
- bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
- if (is_null) {
- ARROW_RETURN_NOT_OK(builder.AppendNull());
- continue;
- }
- auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
- ARROW_RETURN_NOT_OK(builder.Append(*(typename T::c_type*)cell_ptr));
- }
- return builder.Finish(&_arrays[_cur_field_idx]);
- }
-
-private:
- const RowBatch& _batch;
- const std::shared_ptr<arrow::Schema>& _schema;
- arrow::MemoryPool* _pool;
-
- size_t _cur_field_idx;
- std::unique_ptr<SlotRef> _cur_slot_ref;
-
- std::string _time_zone;
-
- std::vector<std::shared_ptr<arrow::Array>> _arrays;
-};
-
-Status FromRowBatchConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
- std::vector<SlotDescriptor*> slot_descs;
- for (auto tuple_desc : _batch.row_desc().tuple_descriptors()) {
- for (auto desc : tuple_desc->slots()) {
- slot_descs.push_back(desc);
- }
- }
- size_t num_fields = _schema->num_fields();
- if (slot_descs.size() != num_fields) {
- return Status::InvalidArgument("number fields not match");
- }
-
- _arrays.resize(num_fields);
-
- for (size_t idx = 0; idx < num_fields; ++idx) {
- _cur_field_idx = idx;
- _cur_slot_ref.reset(new SlotRef(slot_descs[idx]));
- RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _batch.row_desc()));
- auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this);
- if (!arrow_st.ok()) {
- return to_status(arrow_st);
- }
- }
- *out = arrow::RecordBatch::Make(_schema, _batch.num_rows(), std::move(_arrays));
- return Status::OK();
-}
-
-Status convert_to_arrow_batch(const RowBatch& batch, const std::shared_ptr<arrow::Schema>& schema,
- arrow::MemoryPool* pool,
- std::shared_ptr<arrow::RecordBatch>* result) {
- FromRowBatchConverter converter(batch, schema, pool);
- return converter.convert(result);
-}
-
-// Convert Arrow Array to RowBatch
-class ToRowBatchConverter : public arrow::ArrayVisitor {
-public:
- using arrow::ArrayVisitor::Visit;
-
- ToRowBatchConverter(const arrow::RecordBatch& batch, const RowDescriptor& row_desc)
- : _batch(batch), _row_desc(row_desc) {}
-
-#define PRIMITIVE_VISIT(TYPE) \
- arrow::Status Visit(const arrow::TYPE& array) override { return _visit(array); }
-
- PRIMITIVE_VISIT(Int8Array);
- PRIMITIVE_VISIT(Int16Array);
- PRIMITIVE_VISIT(Int32Array);
- PRIMITIVE_VISIT(Int64Array);
- PRIMITIVE_VISIT(FloatArray);
- PRIMITIVE_VISIT(DoubleArray);
-
-#undef PRIMITIVE_VISIT
-
- // Convert to a RowBatch
- Status convert(std::shared_ptr<RowBatch>* result);
-
-private:
- template <typename T>
- typename std::enable_if<std::is_base_of<arrow::PrimitiveCType, typename T::TypeClass>::value,
- arrow::Status>::type
- _visit(const T& array) {
- auto raw_values = array.raw_values();
- for (size_t i = 0; i < array.length(); ++i) {
- auto row = _output->get_row(i);
- auto tuple = _cur_slot_ref->get_tuple(row);
- if (array.IsValid(i)) {
- tuple->set_not_null(_cur_slot_ref->null_indicator_offset());
- auto slot = _cur_slot_ref->get_slot(row);
- *(typename T::TypeClass::c_type*)slot = raw_values[i];
- } else {
- tuple->set_null(_cur_slot_ref->null_indicator_offset());
- }
- }
- return arrow::Status::OK();
- }
-
-private:
- const arrow::RecordBatch& _batch;
- const RowDescriptor& _row_desc;
-
- std::unique_ptr<SlotRef> _cur_slot_ref;
- std::shared_ptr<RowBatch> _output;
-};
-
-Status ToRowBatchConverter::convert(std::shared_ptr<RowBatch>* result) {
- std::vector<SlotDescriptor*> slot_descs;
- for (auto tuple_desc : _row_desc.tuple_descriptors()) {
- for (auto desc : tuple_desc->slots()) {
- slot_descs.push_back(desc);
- }
- }
- size_t num_fields = slot_descs.size();
- if (num_fields != _batch.schema()->num_fields()) {
- return Status::InvalidArgument("Schema not match");
- }
- // TODO(zc): check if field type match
-
- size_t num_rows = _batch.num_rows();
- _output.reset(new RowBatch(_row_desc, num_rows));
- _output->commit_rows(num_rows);
- auto pool = _output->tuple_data_pool();
- for (size_t row_id = 0; row_id < num_rows; ++row_id) {
- auto row = _output->get_row(row_id);
- for (int tuple_id = 0; tuple_id < _row_desc.tuple_descriptors().size(); ++tuple_id) {
- auto tuple_desc = _row_desc.tuple_descriptors()[tuple_id];
- auto tuple = pool->allocate(tuple_desc->byte_size());
- row->set_tuple(tuple_id, (Tuple*)tuple);
- }
- }
- for (size_t idx = 0; idx < num_fields; ++idx) {
- _cur_slot_ref.reset(new SlotRef(slot_descs[idx]));
- RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _row_desc));
- auto arrow_st = arrow::VisitArrayInline(*_batch.column(idx), this);
- if (!arrow_st.ok()) {
- return to_status(arrow_st);
- }
- }
-
- *result = std::move(_output);
-
- return Status::OK();
-}
-
-Status convert_to_row_batch(const arrow::RecordBatch& batch, const RowDescriptor& row_desc,
- std::shared_ptr<RowBatch>* result) {
- ToRowBatchConverter converter(batch, row_desc);
- return converter.convert(result);
-}
-
Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result) {
// create sink memory buffer outputstream with the computed capacity
int64_t capacity;
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index f75b060502..25203ec170 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -36,29 +36,12 @@ class Schema;
namespace doris {
class ObjectPool;
-class RowBatch;
class RowDescriptor;
// Convert Doris RowDescriptor to Arrow Schema.
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
std::shared_ptr<arrow::Schema>* result);
-// Convert an Arrow Schema to a Doris RowDescriptor which will be add to
-// input pool.
-// Why we should
-Status convert_to_row_desc(ObjectPool* pool, const arrow::Schema& schema, RowDescriptor** row_desc);
-
-// Convert a Doris RowBatch to an Arrow RecordBatch. A valid Arrow Schema
-// who should match RowBatch's schema is given. Memory used by result RecordBatch
-// will be allocated from input pool.
-Status convert_to_arrow_batch(const RowBatch& batch, const std::shared_ptr<arrow::Schema>& schema,
- arrow::MemoryPool* pool, std::shared_ptr<arrow::RecordBatch>* result);
-
-// Convert an Arrow RecordBatch to a Doris RowBatch. A valid RowDescriptor
-// whose schema is the same with RecordBatch's should be given.
-Status convert_to_row_batch(const arrow::RecordBatch& batch, const RowDescriptor& row_desc,
- std::shared_ptr<RowBatch>* result);
-
Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);
} // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 817a2ac643..ebe46fdd0c 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -26,7 +26,6 @@
#include "agent/be_exec_version_manager.h"
#include "common/status.h"
#include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "udf/udf.h"
diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
index 33a179c3ae..d8107ba925 100644
--- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
+++ b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
@@ -21,7 +21,6 @@
#include "exec/exec_node.h"
#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7125149494..39ae99a36e 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -25,7 +25,6 @@
#include "exprs/expr.h"
#include "exprs/runtime_filter_slots_cross.h"
#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "util/simd/bits.h"
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 2e3330f0a5..ecf4d87081 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -21,7 +21,6 @@
#include "exec/exec_node.h"
#include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_string.h"
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index acd8fafe53..b2235de9ab 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -18,7 +18,6 @@
#include "vec/exec/vanalytic_eval_node.h"
#include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr.h"
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp
index ae3d7d0986..e91626d49c 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -21,7 +21,6 @@
#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp
index e0f520f6ae..3fb03bd19c 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -20,7 +20,6 @@
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp
index 1bdd5c8563..9e3e6a1c42 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -20,7 +20,6 @@
#include "exec/text_converter.h"
#include "exec/text_converter.hpp"
#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h
index 1e339e3a80..ad9cbab7e7 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -18,7 +18,6 @@
#pragma once
#include "exec/exec_node.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "vec/core/materialize_block.h"
#include "vec/exec/join/join_op.h"
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 97916632a9..4ebdd05506 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -19,7 +19,6 @@
#include "common/config.h"
#include "pipeline/pipeline.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
#include "vec/common/sort/heap_sorter.h"
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp
index 21b7a0f521..17f87aac2e 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -20,7 +20,6 @@
#include <vector>
#include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
#include "util/debug_util.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index a25683e6da..2c8f79de48 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -21,7 +21,6 @@
#include "runtime/buffer_control_block.h"
#include "runtime/exec_env.h"
#include "runtime/result_buffer_mgr.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/exprs/vexpr.h"
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 03a0181e55..7f756aff83 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -27,7 +27,6 @@
#include "exprs/expr_context.h"
#include "olap/hll.h"
#include "runtime/exec_env.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/tuple_row.h"
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index c299fdca55..b62544cb7c 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -33,7 +33,6 @@
#include "exprs/expr_context.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
-#include "runtime/row_batch.h"
#include "runtime/thread_context.h"
#include "util/bitmap.h"
#include "util/countdown_latch.h"
diff --git a/be/test/exprs/binary_predicate_test.cpp b/be/test/exprs/binary_predicate_test.cpp
index a53d3754fc..af96e332cb 100644
--- a/be/test/exprs/binary_predicate_test.cpp
+++ b/be/test/exprs/binary_predicate_test.cpp
@@ -24,7 +24,6 @@
#include "exprs/expr.h"
#include "exprs/int_literal.h"
#include "gen_cpp/Exprs_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
diff --git a/be/test/exprs/in_op_test.cpp b/be/test/exprs/in_op_test.cpp
index c1de71f4e9..fc49b4dd5b 100644
--- a/be/test/exprs/in_op_test.cpp
+++ b/be/test/exprs/in_op_test.cpp
@@ -23,7 +23,6 @@
#include "exprs/in_predicate.h"
#include "exprs/int_literal.h"
#include "gen_cpp/Exprs_types.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
diff --git a/be/test/runtime/data_spliter_test.cpp b/be/test/runtime/data_spliter_test.cpp
index 1f0ab50c5c..b497978e3b 100644
--- a/be/test/runtime/data_spliter_test.cpp
+++ b/be/test/runtime/data_spliter_test.cpp
@@ -26,7 +26,6 @@
#include "olap/olap_main.cpp"
#include "runtime/descriptors.h"
#include "runtime/dpp_sink_internal.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp
index 55939cc04f..3dfc8b0a37 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -22,7 +22,6 @@
#include "common/config.h"
#include "exec/data_sink.h"
#include "runtime/plan_fragment_executor.h"
-#include "runtime/row_batch.h"
namespace doris {
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 51a743e738..a4de5e9129 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -26,7 +26,6 @@
#include "agent/be_exec_version_manager.h"
#include "exec/schema_scanner.h"
#include "gen_cpp/data.pb.h"
-#include "runtime/row_batch.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
#include "vec/columns/column_array.h"
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
index 9149fb3dd7..a4d7a6c5d5 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -32,7 +32,6 @@
#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "runtime/user_function_cache.h"
diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp
index 39e3bb56ed..d47052efce 100644
--- a/be/test/vec/exec/vorc_scanner_test.cpp
+++ b/be/test/vec/exec/vorc_scanner_test.cpp
@@ -32,7 +32,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "runtime/user_function_cache.h"
diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp
index c08a69a005..49b3ba3c56 100644
--- a/be/test/vec/exec/vparquet_scanner_test.cpp
+++ b/be/test/vec/exec/vparquet_scanner_test.cpp
@@ -28,7 +28,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "runtime/user_function_cache.h"
diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp
index edea60aa12..8eb195535e 100644
--- a/be/test/vec/exprs/vexpr_test.cpp
+++ b/be/test/vec/exprs/vexpr_test.cpp
@@ -30,7 +30,6 @@
#include "runtime/large_int_value.h"
#include "runtime/memory/chunk_allocator.h"
#include "runtime/primitive_type.h"
-#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
@@ -48,25 +47,12 @@ TEST(TEST_VEXPR, ABSTEST) {
auto tuple_desc = const_cast<doris::TupleDescriptor*>(desc_tbl->get_tuple_descriptor(0));
doris::RowDescriptor row_desc(tuple_desc, false);
- doris::RowBatch row_batch(row_desc, 1024);
std::string expr_json =
R"|({"1":{"lst":["rec",2,{"1":{"i32":20},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"4":{"i32":1},"20":{"i32":-1},"26":{"rec":{"1":{"rec":{"2":{"str":"abs"}}},"2":{"i32":0},"3":{"lst":["rec",1,{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}]},"4":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"5":{"tf":0},"7":{"str":"abs(INT)"},"9":{"rec":{"1":{"str":"_ZN5doris13MathFunctions3absEPN9doris_ud [...]
doris::TExpr exprx = apache::thrift::from_json_string<doris::TExpr>(expr_json);
doris::vectorized::VExprContext* context = nullptr;
doris::vectorized::VExpr::create_expr_tree(&object_pool, exprx, &context);
- int32_t k1 = -100;
- for (int i = 0; i < 1024; ++i, k1++) {
- auto idx = row_batch.add_row();
- doris::TupleRow* tuple_row = row_batch.get_row(idx);
- auto tuple =
- (doris::Tuple*)(row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()));
- auto slot_desc = tuple_desc->slots()[0];
- memcpy(tuple->get_slot(slot_desc->tuple_offset()), &k1, slot_desc->slot_size());
- tuple_row->set_tuple(0, tuple);
- row_batch.commit_last_row();
- }
-
doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(),
doris::TQueryGlobals(), nullptr);
runtime_stat.init_mem_trackers();
@@ -85,7 +71,6 @@ TEST(TEST_VEXPR, ABSTEST2) {
schema_scanner.init(¶m, &object_pool);
auto tuple_desc = const_cast<TupleDescriptor*>(schema_scanner.tuple_desc());
RowDescriptor row_desc(tuple_desc, false);
- RowBatch row_batch(row_desc, 1024);
std::string expr_json =
R"|({"1":{"lst":["rec",2,{"1":{"i32":20},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"4":{"i32":1},"20":{"i32":-1},"26":{"rec":{"1":{"rec":{"2":{"str":"abs"}}},"2":{"i32":0},"3":{"lst":["rec",1,{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}]},"4":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"5":{"tf":0},"7":{"str":"abs(INT)"},"9":{"rec":{"1":{"str":"_ZN5doris13MathFunctions3absEPN9doris_ud [...]
TExpr exprx = apache::thrift::from_json_string<TExpr>(expr_json);
@@ -93,18 +78,6 @@ TEST(TEST_VEXPR, ABSTEST2) {
doris::vectorized::VExprContext* context = nullptr;
doris::vectorized::VExpr::create_expr_tree(&object_pool, exprx, &context);
- int32_t k1 = -100;
- for (int i = 0; i < 1024; ++i, k1++) {
- auto idx = row_batch.add_row();
- doris::TupleRow* tuple_row = row_batch.get_row(idx);
- auto tuple =
- (doris::Tuple*)(row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()));
- auto slot_desc = tuple_desc->slots()[0];
- memcpy(tuple->get_slot(slot_desc->tuple_offset()), &k1, slot_desc->slot_size());
- tuple_row->set_tuple(0, tuple);
- row_batch.commit_last_row();
- }
-
doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(),
doris::TQueryGlobals(), nullptr);
runtime_stat.init_mem_trackers();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org