You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/11 01:37:42 UTC

[doris] branch master updated: [refactor](remove row batch) remove impala rowbatch structure (#15767)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d857b4af1b [refactor](remove row batch) remove impala rowbatch structure (#15767)
d857b4af1b is described below

commit d857b4af1b42209c2715f3db33c88a6ee12ade1c
Author: yiguolei <67...@qq.com>
AuthorDate: Wed Jan 11 09:37:35 2023 +0800

    [refactor](remove row batch) remove impala rowbatch structure (#15767)
    
    * [refactor](remove row batch) remove impala rowbatch structure
    
    Co-authored-by: yiguolei <yi...@gmail.com>
---
 be/src/exec/exec_node.cpp                          |   1 -
 be/src/exec/table_connector.h                      |   1 -
 be/src/olap/delta_writer.cpp                       |   1 -
 be/src/runtime/CMakeLists.txt                      |   3 +-
 be/src/runtime/cache/result_cache.h                |   1 -
 be/src/runtime/cache/result_node.h                 |   1 -
 be/src/runtime/plan_fragment_executor.cpp          |   1 -
 be/src/runtime/row_batch.cpp                       | 507 ---------------------
 be/src/runtime/row_batch.h                         | 478 -------------------
 be/src/runtime/tablets_channel.cpp                 |   1 -
 be/src/util/arrow/block_convertor.h                |   5 +-
 be/src/util/arrow/row_batch.cpp                    | 344 --------------
 be/src/util/arrow/row_batch.h                      |  17 -
 be/src/vec/core/block.cpp                          |   1 -
 .../vec/exec/data_gen_functions/vnumbers_tvf.cpp   |   1 -
 be/src/vec/exec/join/vnested_loop_join_node.cpp    |   1 -
 be/src/vec/exec/vaggregation_node.cpp              |   1 -
 be/src/vec/exec/vanalytic_eval_node.cpp            |   1 -
 be/src/vec/exec/vdata_gen_scan_node.cpp            |   1 -
 be/src/vec/exec/vmysql_scan_node.cpp               |   1 -
 be/src/vec/exec/vschema_scan_node.cpp              |   1 -
 be/src/vec/exec/vset_operation_node.h              |   1 -
 be/src/vec/exec/vsort_node.cpp                     |   1 -
 be/src/vec/runtime/vsorted_run_merger.cpp          |   1 -
 be/src/vec/sink/vresult_file_sink.cpp              |   1 -
 be/src/vec/sink/vtablet_sink.cpp                   |   1 -
 be/src/vec/sink/vtablet_sink.h                     |   1 -
 be/test/exprs/binary_predicate_test.cpp            |   1 -
 be/test/exprs/in_op_test.cpp                       |   1 -
 be/test/runtime/data_spliter_test.cpp              |   1 -
 be/test/runtime/fragment_mgr_test.cpp              |   1 -
 be/test/vec/core/block_test.cpp                    |   1 -
 be/test/vec/exec/vjson_scanner_test.cpp            |   1 -
 be/test/vec/exec/vorc_scanner_test.cpp             |   1 -
 be/test/vec/exec/vparquet_scanner_test.cpp         |   1 -
 be/test/vec/exprs/vexpr_test.cpp                   |  27 --
 36 files changed, 3 insertions(+), 1407 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index e29f5535f9..3a2d713403 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -31,7 +31,6 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 9f5330a486..6ba0c26b8b 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -28,7 +28,6 @@
 #include "common/status.h"
 #include "exprs/expr_context.h"
 #include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris {
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 22b7f568d1..ee17286b6b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -24,7 +24,6 @@
 #include "olap/schema.h"
 #include "olap/storage_engine.h"
 #include "runtime/load_channel_mgr.h"
-#include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
 #include "service/backend_options.h"
 #include "util/brpc_client_cache.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 864dad3099..b2930ffb8a 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,8 +37,7 @@ set(RUNTIME_FILES
     primitive_type.cpp
     raw_value.cpp
     result_buffer_mgr.cpp
-    result_writer.cpp
-    row_batch.cpp
+    result_writer.cpp    
     runtime_state.cpp
     runtime_filter_mgr.cpp
     runtime_predicate.cpp
diff --git a/be/src/runtime/cache/result_cache.h b/be/src/runtime/cache/result_cache.h
index de1a3711a6..0148bad786 100644
--- a/be/src/runtime/cache/result_cache.h
+++ b/be/src/runtime/cache/result_cache.h
@@ -32,7 +32,6 @@
 #include "runtime/cache/cache_utils.h"
 #include "runtime/cache/result_node.h"
 #include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
 
 namespace doris {
diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h
index 9b7b8a17a3..ca1b344d9f 100644
--- a/be/src/runtime/cache/result_node.h
+++ b/be/src/runtime/cache/result_node.h
@@ -34,7 +34,6 @@
 #include "olap/olap_define.h"
 #include "runtime/cache/cache_utils.h"
 #include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
 #include "util/uid_util.h"
 
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 4ff1e99c32..f36cefaf51 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -32,7 +32,6 @@
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/result_queue_mgr.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/thread_context.h"
 #include "util/container_util.hpp"
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
deleted file mode 100644
index 7a86f922e9..0000000000
--- a/be/src/runtime/row_batch.cpp
+++ /dev/null
@@ -1,507 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.cc
-// and modified by Doris
-
-#include "runtime/row_batch.h"
-
-#include <snappy/snappy.h>
-#include <stdint.h> // for intptr_t
-
-#include "common/utils.h"
-#include "gen_cpp/Data_types.h"
-#include "gen_cpp/data.pb.h"
-#include "runtime/collection_value.h"
-#include "runtime/exec_env.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/thread_context.h"
-#include "runtime/tuple_row.h"
-#include "util/exception.h"
-#include "vec/columns/column_vector.h"
-#include "vec/core/block.h"
-
-using std::vector;
-
-namespace doris {
-
-const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
-const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
-
-RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity)
-        : _has_in_flight_row(false),
-          _num_rows(0),
-          _num_uncommitted_rows(0),
-          _capacity(capacity),
-          _flush(FlushMode::NO_FLUSH_RESOURCES),
-          _needs_deep_copy(false),
-          _num_tuples_per_row(row_desc.tuple_descriptors().size()),
-          _row_desc(row_desc),
-          _auxiliary_mem_usage(0),
-          _need_to_return(false),
-          _tuple_data_pool() {
-    DCHECK_GT(capacity, 0);
-    _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
-    DCHECK_GT(_tuple_ptrs_size, 0);
-    _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
-    DCHECK(_tuple_ptrs != nullptr);
-}
-
-// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
-// global runtime memory segment; how do we get thrift to allocate it from there?
-// maybe change line (in Data_types.cc generated from Data.thrift)
-//              xfer += iprot->readString(this->tuple_data[_i9]);
-// to allocated string data in special mempool
-// (change via python script that runs over Data_types.cc)
-RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
-        : _has_in_flight_row(false),
-          _num_rows(input_batch.num_rows()),
-          _num_uncommitted_rows(0),
-          _capacity(_num_rows),
-          _flush(FlushMode::NO_FLUSH_RESOURCES),
-          _needs_deep_copy(false),
-          _num_tuples_per_row(input_batch.row_tuples_size()),
-          _row_desc(row_desc),
-          _auxiliary_mem_usage(0),
-          _need_to_return(false) {
-    _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
-    DCHECK_GT(_tuple_ptrs_size, 0);
-    _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
-    DCHECK(_tuple_ptrs != nullptr);
-
-    char* tuple_data = nullptr;
-    if (input_batch.is_compressed()) {
-        // Decompress tuple data into data pool
-        const char* compressed_data = input_batch.tuple_data().c_str();
-        size_t compressed_size = input_batch.tuple_data().size();
-        size_t uncompressed_size = 0;
-        bool success =
-                snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
-        DCHECK(success) << "snappy::GetUncompressedLength failed";
-        tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
-        success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
-        DCHECK(success) << "snappy::RawUncompress failed";
-    } else {
-        // Tuple data uncompressed, copy directly into data pool
-        tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size());
-        memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
-    }
-
-    // convert input_batch.tuple_offsets into pointers
-    int tuple_idx = 0;
-    // For historical reasons, the original offset was stored using int32,
-    // so that if a rowbatch is larger than 2GB, the passed offset may generate an error due to value overflow.
-    // So in the new version, a new_tuple_offsets structure is added to store offsets using int64.
-    // Here, to maintain compatibility, both versions of offsets are used, with preference given to new_tuple_offsets.
-    // TODO(cmy): in the next version, the original tuple_offsets should be removed.
-    if (input_batch.new_tuple_offsets_size() > 0) {
-        for (int64_t offset : input_batch.new_tuple_offsets()) {
-            if (offset == -1) {
-                _tuple_ptrs[tuple_idx++] = nullptr;
-            } else {
-                _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
-            }
-        }
-    } else {
-        for (int32_t offset : input_batch.tuple_offsets()) {
-            if (offset == -1) {
-                _tuple_ptrs[tuple_idx++] = nullptr;
-            } else {
-                _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
-            }
-        }
-    }
-
-    // Check whether we have slots that require offset-to-pointer conversion.
-    if (!_row_desc.has_varlen_slots()) {
-        return;
-    }
-
-    const auto& tuple_descs = _row_desc.tuple_descriptors();
-
-    // For every unique tuple, convert string offsets contained in tuple data into
-    // pointers. Tuples were serialized in the order we are deserializing them in,
-    // so the first occurrence of a tuple will always have a higher offset than any tuple
-    // we already converted.
-    for (int i = 0; i < _num_rows; ++i) {
-        TupleRow* row = get_row(i);
-        for (size_t j = 0; j < tuple_descs.size(); ++j) {
-            auto desc = tuple_descs[j];
-            if (desc->string_slots().empty() && desc->collection_slots().empty()) {
-                continue;
-            }
-
-            Tuple* tuple = row->get_tuple(j);
-            if (tuple == nullptr) {
-                continue;
-            }
-
-            for (auto slot : desc->string_slots()) {
-                DCHECK(slot->type().is_string_type());
-                if (tuple->is_null(slot->null_indicator_offset())) {
-                    continue;
-                }
-
-                StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
-                int64_t offset = convert_to<int64_t>(string_val->ptr);
-                string_val->ptr = tuple_data + offset;
-
-                // Why we do this mask? Field len of StringValue is changed from int to size_t in
-                // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
-                // this works fine in version 0.10, however in 0.11 this will lead to an invalid
-                // length. So we make the high bits zero here.
-                string_val->len &= 0x7FFFFFFFL;
-            }
-
-            // copy collection slots
-            for (auto slot_collection : desc->collection_slots()) {
-                DCHECK(slot_collection->type().is_collection_type());
-                if (tuple->is_null(slot_collection->null_indicator_offset())) {
-                    continue;
-                }
-
-                CollectionValue* array_val =
-                        tuple->get_collection_slot(slot_collection->tuple_offset());
-                const auto& item_type_desc = slot_collection->type().children[0];
-                CollectionValue::deserialize_collection(array_val, tuple_data, item_type_desc);
-            }
-        }
-    }
-}
-
-void RowBatch::clear() {
-    if (_cleared) {
-        return;
-    }
-
-    _tuple_data_pool.free_all();
-    _agg_object_pool.clear();
-    for (int i = 0; i < _io_buffers.size(); ++i) {
-        _io_buffers[i]->return_buffer();
-    }
-
-    for (BufferInfo& buffer_info : _buffers) {
-        ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
-    }
-
-    DCHECK(_tuple_ptrs != nullptr);
-    free(_tuple_ptrs);
-    _tuple_ptrs = nullptr;
-    _cleared = true;
-}
-
-RowBatch::~RowBatch() {
-    clear();
-}
-
-static inline size_t align_tuple_offset(size_t offset) {
-    if (config::rowbatch_align_tuple_offset) {
-        return (offset + alignof(std::max_align_t) - 1) & (~(alignof(std::max_align_t) - 1));
-    }
-
-    return offset;
-}
-
-Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
-                           size_t* compressed_size, bool allow_transfer_large_data) {
-    // num_rows
-    output_batch->set_num_rows(_num_rows);
-    // row_tuples
-    _row_desc.to_protobuf(output_batch->mutable_row_tuples());
-    // tuple_offsets: must clear before reserve
-    // TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead.
-    // keep tuple_offsets here is just for compatibility.
-    output_batch->clear_tuple_offsets();
-    output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
-    output_batch->clear_new_tuple_offsets();
-    output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
-    // is_compressed
-    output_batch->set_is_compressed(false);
-    // tuple data
-    size_t tuple_byte_size = total_byte_size();
-    std::string* mutable_tuple_data = output_batch->mutable_tuple_data();
-    mutable_tuple_data->resize(tuple_byte_size);
-
-    // Copy tuple data, including strings, into output_batch (converting string
-    // pointers into offsets in the process)
-    int64_t offset = 0; // current offset into output_batch->tuple_data
-    char* tuple_data = mutable_tuple_data->data();
-    const auto& tuple_descs = _row_desc.tuple_descriptors();
-    const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
-    const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets();
-
-    for (int i = 0; i < _num_rows; ++i) {
-        TupleRow* row = get_row(i);
-        for (size_t j = 0; j < tuple_descs.size(); ++j) {
-            auto desc = tuple_descs[j];
-            if (row->get_tuple(j) == nullptr) {
-                // NULLs are encoded as -1
-                mutable_tuple_offsets->Add(-1);
-                mutable_new_tuple_offsets->Add(-1);
-                continue;
-            }
-
-            int64_t old_offset = offset;
-            offset = align_tuple_offset(offset);
-            tuple_data += offset - old_offset;
-
-            // Record offset before creating copy (which increments offset and tuple_data)
-            mutable_tuple_offsets->Add((int32_t)offset);
-            mutable_new_tuple_offsets->Add(offset);
-            row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
-            CHECK_GE(offset, 0);
-        }
-    }
-    CHECK_EQ(offset, tuple_byte_size)
-            << "offset: " << offset << " vs. tuple_byte_size: " << tuple_byte_size;
-
-    size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
-    bool can_compress = config::compress_rowbatches && tuple_byte_size > 0;
-    if (can_compress) {
-        try {
-            // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
-            _compression_scratch.resize(max_compressed_size);
-        } catch (...) {
-            can_compress = false;
-            std::exception_ptr p = std::current_exception();
-            LOG(WARNING) << "Try to alloc " << max_compressed_size
-                         << " bytes for compression scratch failed. "
-                         << get_current_exception_type_name(p);
-        }
-    }
-    if (can_compress) {
-        // Try compressing tuple_data to _compression_scratch, swap if compressed data is
-        // smaller
-        size_t compressed_size = 0;
-        char* compressed_output = _compression_scratch.data();
-        snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output,
-                            &compressed_size);
-        if (LIKELY(compressed_size < tuple_byte_size)) {
-            _compression_scratch.resize(compressed_size);
-            mutable_tuple_data->swap(_compression_scratch);
-            output_batch->set_is_compressed(true);
-        }
-
-        VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size
-                 << ", compressed size: " << compressed_size;
-    }
-
-    // return compressed and uncompressed size
-    size_t pb_size = get_batch_size(*output_batch);
-    *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
-    *compressed_size = pb_size;
-    if (!allow_transfer_large_data && pb_size > std::numeric_limits<int32_t>::max()) {
-        // the protobuf has a hard limit of 2GB for serialized data.
-        return Status::InternalError(
-                "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size);
-    }
-    return Status::OK();
-}
-
-// when row from files can't fill into tuple with schema limitation, increase the _num_uncommitted_rows in row batch,
-void RowBatch::increase_uncommitted_rows() {
-    _num_uncommitted_rows++;
-}
-
-void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) {
-    DCHECK(buffer != nullptr);
-    _io_buffers.push_back(buffer);
-    _auxiliary_mem_usage += buffer->buffer_len();
-}
-
-Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size,
-                                                  uint8_t** buffer) {
-    int64_t row_size = _row_desc.get_row_size();
-    // Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
-    if (row_size != 0) {
-        _capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
-    }
-    *tuple_buffer_size = row_size * _capacity;
-    // TODO(dhc): change allocate to try_allocate?
-    *buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
-    if (*buffer == nullptr) {
-        std::stringstream ss;
-        ss << "Failed to allocate tuple buffer" << *tuple_buffer_size;
-        LOG(WARNING) << ss.str();
-        return state->set_mem_limit_exceeded(ss.str());
-    }
-    return Status::OK();
-}
-
-void RowBatch::reset() {
-    _num_rows = 0;
-    _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
-    _has_in_flight_row = false;
-
-    // TODO: Change this to Clear() and investigate the repercussions.
-    _tuple_data_pool.free_all();
-    _agg_object_pool.clear();
-    for (int i = 0; i < _io_buffers.size(); ++i) {
-        _io_buffers[i]->return_buffer();
-    }
-    _io_buffers.clear();
-
-    for (BufferInfo& buffer_info : _buffers) {
-        ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
-    }
-    _buffers.clear();
-
-    _auxiliary_mem_usage = 0;
-    _need_to_return = false;
-    _flush = FlushMode::NO_FLUSH_RESOURCES;
-    _needs_deep_copy = false;
-}
-
-void RowBatch::transfer_resource_ownership(RowBatch* dest) {
-    dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes();
-    dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false);
-    dest->_agg_object_pool.acquire_data(&_agg_object_pool);
-    for (int i = 0; i < _io_buffers.size(); ++i) {
-        DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
-        dest->_io_buffers.push_back(buffer);
-        dest->_auxiliary_mem_usage += buffer->buffer_len();
-    }
-    _io_buffers.clear();
-
-    for (BufferInfo& buffer_info : _buffers) {
-        dest->add_buffer(buffer_info.client, std::move(buffer_info.buffer),
-                         FlushMode::NO_FLUSH_RESOURCES);
-    }
-    _buffers.clear();
-
-    dest->_need_to_return |= _need_to_return;
-
-    if (_needs_deep_copy) {
-        dest->mark_needs_deep_copy();
-    } else if (_flush == FlushMode::FLUSH_RESOURCES) {
-        dest->mark_flush_resources();
-    }
-    reset();
-}
-
-size_t RowBatch::get_batch_size(const PRowBatch& batch) {
-    size_t result = batch.tuple_data().size();
-    result += batch.row_tuples().size() * sizeof(int32_t);
-    // TODO(cmy): remove batch.tuple_offsets
-    result += batch.tuple_offsets().size() * sizeof(int32_t);
-    result += batch.new_tuple_offsets().size() * sizeof(int64_t);
-    return result;
-}
-
-void RowBatch::acquire_state(RowBatch* src) {
-    // DCHECK(_row_desc.equals(src->_row_desc));
-    DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row);
-    // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
-    DCHECK_EQ(_auxiliary_mem_usage, 0);
-
-    // The destination row batch should be empty.
-    DCHECK(!_has_in_flight_row);
-    DCHECK_EQ(_num_rows, 0);
-
-    for (int i = 0; i < src->_io_buffers.size(); ++i) {
-        DiskIoMgr::BufferDescriptor* buffer = src->_io_buffers[i];
-        _io_buffers.push_back(buffer);
-        _auxiliary_mem_usage += buffer->buffer_len();
-    }
-    src->_io_buffers.clear();
-    src->_auxiliary_mem_usage = 0;
-
-    _has_in_flight_row = src->_has_in_flight_row;
-    _num_rows = src->_num_rows;
-    _capacity = src->_capacity;
-    _need_to_return = src->_need_to_return;
-    // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
-    std::swap(_tuple_ptrs, src->_tuple_ptrs);
-    src->transfer_resource_ownership(this);
-}
-
-void RowBatch::deep_copy_to(RowBatch* dst) {
-    DCHECK(dst->_row_desc.equals(_row_desc));
-    DCHECK_EQ(dst->_num_rows, 0);
-    DCHECK_GE(dst->_capacity, _num_rows);
-    dst->add_rows(_num_rows);
-    for (int i = 0; i < _num_rows; ++i) {
-        TupleRow* src_row = get_row(i);
-        TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
-        src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false);
-    }
-    dst->commit_rows(_num_rows);
-}
-
-// TODO: consider computing size of batches as they are built up
-size_t RowBatch::total_byte_size() const {
-    size_t result = 0;
-
-    // Sum total variable length byte sizes.
-    for (int i = 0; i < _num_rows; ++i) {
-        TupleRow* row = get_row(i);
-        const auto& tuple_descs = _row_desc.tuple_descriptors();
-        for (size_t j = 0; j < tuple_descs.size(); ++j) {
-            auto desc = tuple_descs[j];
-            Tuple* tuple = row->get_tuple(j);
-            if (tuple == nullptr) {
-                continue;
-            }
-            result = align_tuple_offset(result);
-            result += desc->byte_size();
-
-            for (auto slot : desc->string_slots()) {
-                DCHECK(slot->type().is_string_type());
-                if (tuple->is_null(slot->null_indicator_offset())) {
-                    continue;
-                }
-                StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
-                result += string_val->len;
-            }
-
-            // compute slot collection size
-            for (auto slot_collection : desc->collection_slots()) {
-                DCHECK(slot_collection->type().is_collection_type());
-                if (tuple->is_null(slot_collection->null_indicator_offset())) {
-                    continue;
-                }
-                CollectionValue* array_val =
-                        tuple->get_collection_slot(slot_collection->tuple_offset());
-                const auto& item_type_desc = slot_collection->type().children[0];
-                result += array_val->get_byte_size(item_type_desc);
-            }
-        }
-    }
-
-    return result;
-}
-
-void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
-                          FlushMode flush) {
-    _auxiliary_mem_usage += buffer.len();
-    BufferInfo buffer_info;
-    buffer_info.client = client;
-    buffer_info.buffer = std::move(buffer);
-    _buffers.push_back(std::move(buffer_info));
-    if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources();
-}
-
-std::string RowBatch::to_string() {
-    std::stringstream out;
-    for (int i = 0; i < _num_rows; ++i) {
-        out << get_row(i)->to_string(_row_desc) << "\n";
-    }
-    return out.str();
-}
-
-} // end namespace doris
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
deleted file mode 100644
index bf920e1905..0000000000
--- a/be/src/runtime/row_batch.h
+++ /dev/null
@@ -1,478 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.h
-// and modified by Doris
-
-#pragma once
-
-#include <cstring>
-#include <vector>
-
-#include "common/logging.h"
-#include "runtime/bufferpool/buffer_pool.h"
-#include "runtime/descriptors.h"
-#include "runtime/disk_io_mgr.h"
-#include "runtime/mem_pool.h"
-
-namespace doris::vectorized {
-class Block;
-}
-
-namespace doris {
-
-class BufferedTupleStream2;
-class Tuple;
-class TupleRow;
-class TupleDescriptor;
-class PRowBatch;
-
-// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
-// The maximum number of rows is fixed at the time of construction, and the caller
-// can add rows up to that capacity.
-// The row batch reference a few different sources of memory.
-//   1. TupleRow ptrs - this is always owned and managed by the row batch.
-//   2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool.
-//   3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally
-//      (don't copy strings) or from the tuple pool (strings are copied).  If external,
-//      the data is in an io buffer that may not be attached to this row batch.  The
-//      creator of that row batch has to make sure that the io buffer is not recycled
-//      until all batches that reference the memory have been consumed.
-// In order to minimize memory allocations, RowBatches and PRowBatches that have been
-// serialized and sent over the wire should be reused (this prevents _compression_scratch
-// from being needlessly reallocated).
-//
-// Row batches and memory usage: We attempt to stream row batches through the plan
-// tree without copying the data. This means that row batches are often not-compact
-// and reference memory outside of the row batch. This results in most row batches
-// having a very small memory footprint and in some row batches having a very large
-// one (it contains all the memory that other row batches are referencing). An example
-// is IoBuffers which are only attached to one row batch. Only when the row batch reaches
-// a blocking operator or the root of the fragment is the row batch memory freed.
-// This means that in some cases (e.g. very selective queries), we still need to
-// pass the row batch through the exec nodes (even if they have no rows) to trigger
-// memory deletion. at_capacity() encapsulates the check that we are not accumulating
-// excessive memory.
-//
-// A row batch is considered at capacity if all the rows are full or it has accumulated
-// auxiliary memory up to a soft cap. (See _at_capacity_mem_usage comment).
-// TODO: stick _tuple_ptrs into a pool?
-class RowBatch {
-public:
-    /// Flag indicating whether the resources attached to a RowBatch need to be flushed.
-    /// Defined here as a convenience for other modules that need to communicate flushing
-    /// modes.
-    enum class FlushMode {
-        FLUSH_RESOURCES,
-        NO_FLUSH_RESOURCES,
-    };
-
-    // Create RowBatch for a maximum of 'capacity' rows of tuples specified
-    // by 'row_desc'.
-    RowBatch(const RowDescriptor& row_desc, int capacity);
-
-    // Populate a row batch from input_batch by copying input_batch's
-    // tuple_data into the row batch's mempool and converting all offsets
-    // in the data back into pointers.
-    // TODO: figure out how to transfer the data from input_batch to this RowBatch
-    // (so that we don't need to make yet another copy)
-    RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch);
-
-    // Releases all resources accumulated at this row batch.  This includes
-    //  - tuple_ptrs
-    //  - tuple mem pool data
-    //  - buffer handles from the io mgr
-    virtual ~RowBatch();
-
-    // used to c
-    void clear();
-
-    static const int INVALID_ROW_INDEX = -1;
-
-    // Add n rows of tuple pointers after the last committed row and return its index.
-    // The rows are uninitialized and each tuple of the row must be set.
-    // Returns INVALID_ROW_INDEX if the row batch cannot fit n rows.
-    // Two consecutive add_row() calls without a commit_last_row() between them
-    // have the same effect as a single call.
-    int add_rows(int n) {
-        if (_num_rows + n > _capacity) {
-            return INVALID_ROW_INDEX;
-        }
-
-        _has_in_flight_row = true;
-        return _num_rows;
-    }
-
-    int add_row() { return add_rows(1); }
-
-    void commit_rows(int n) {
-        DCHECK_GE(n, 0);
-        DCHECK_LE(_num_rows + n, _capacity);
-        _num_rows += n;
-        _has_in_flight_row = false;
-    }
-
-    void commit_last_row() { commit_rows(1); }
-
-    bool in_flight() const { return _has_in_flight_row; }
-
-    // Set function can be used to reduce the number of rows in the batch.  This is only
-    // used in the limit case where more rows were added than necessary.
-    void set_num_rows(int num_rows) {
-        DCHECK_LE(num_rows, _num_rows);
-        _num_rows = num_rows;
-    }
-
-    // Returns true if the row batch has filled all the rows or has accumulated
-    // enough memory.
-    bool at_capacity() const {
-        return _num_rows == _capacity || _auxiliary_mem_usage >= AT_CAPACITY_MEM_USAGE ||
-               _need_to_return;
-    }
-
-    // Returns true if the row batch has filled all the rows or has accumulated
-    // enough memory. tuple_pool is an intermediate memory pool containing tuple data
-    // that will eventually be attached to this row batch. We need to make sure
-    // the tuple pool does not accumulate excessive memory.
-    bool at_capacity(const MemPool* tuple_pool) const {
-        DCHECK(tuple_pool != nullptr);
-        return at_capacity() || tuple_pool->total_allocated_bytes() > AT_CAPACITY_MEM_USAGE;
-    }
-
-    // Returns true if row_batch has reached capacity.
-    bool is_full() const { return _num_rows == _capacity; }
-
-    // Returns true if uncommitted rows has reached capacity.
-    bool is_full_uncommitted() { return _num_uncommitted_rows == _capacity; }
-
-    // Returns true if the row batch has accumulated enough external memory (in MemPools
-    // and io buffers).  This would be a trigger to compact the row batch or reclaim
-    // the memory in some way.
-    bool at_resource_limit() {
-        return tuple_data_pool()->total_allocated_bytes() > MAX_MEM_POOL_SIZE;
-    }
-
-    // The total size of all data represented in this row batch (tuples and referenced
-    // string data).
-    size_t total_byte_size() const;
-
-    TupleRow* get_row(int row_idx) const {
-        DCHECK(_tuple_ptrs != nullptr);
-        DCHECK_GE(row_idx, 0);
-        //DCHECK_LT(row_idx, _num_rows + (_has_in_flight_row ? 1 : 0));
-        return reinterpret_cast<TupleRow*>(_tuple_ptrs + row_idx * _num_tuples_per_row);
-    }
-
-    /// An iterator for going through a row batch, starting at 'row_idx'.
-    /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit'
-    /// or the last row, whichever comes first. Otherwise, it will iterate till the last
-    /// row in the batch. This is more efficient than using GetRow() as it avoids loading
-    /// the row batch state and doing multiplication on each loop with GetRow().
-    class Iterator {
-    public:
-        Iterator(RowBatch* parent, int row_idx, int limit = -1)
-                : _num_tuples_per_row(parent->_num_tuples_per_row),
-                  _row(parent->_tuple_ptrs + _num_tuples_per_row * row_idx),
-                  _row_batch_end(parent->_tuple_ptrs +
-                                 _num_tuples_per_row *
-                                         (limit == -1 ? parent->_num_rows
-                                                      : std::min<int>(row_idx + limit,
-                                                                      parent->_num_rows))),
-                  _parent(parent) {
-            DCHECK_GE(row_idx, 0);
-            DCHECK_GT(_num_tuples_per_row, 0);
-            /// We allow empty row batches with _num_rows == capacity_ == 0.
-            /// That's why we cannot call GetRow() above to initialize '_row'.
-            DCHECK_LE(row_idx, parent->_capacity);
-        }
-
-        /// Return the current row pointed to by the row pointer.
-        TupleRow* get() { return reinterpret_cast<TupleRow*>(_row); }
-
-        /// Increment the row pointer and return the next row.
-        TupleRow* next() {
-            _row += _num_tuples_per_row;
-            DCHECK_LE((_row - _parent->_tuple_ptrs) / _num_tuples_per_row, _parent->_capacity);
-            return get();
-        }
-
-        /// Returns true if the iterator is beyond the last row for read iterators.
-        /// Useful for read iterators to determine the limit. Write iterators should use
-        /// RowBatch::AtCapacity() instead.
-        bool at_end() const { return _row >= _row_batch_end; }
-
-        /// Returns the row batch which this iterator is iterating through.
-        RowBatch* parent() const { return _parent; }
-
-    private:
-        /// Number of tuples per row.
-        const int _num_tuples_per_row;
-
-        /// Pointer to the current row.
-        Tuple** _row;
-
-        /// Pointer to the row after the last row for read iterators.
-        Tuple** const _row_batch_end;
-
-        /// The row batch being iterated on.
-        RowBatch* const _parent;
-    };
-
-    int num_tuples_per_row() const { return _num_tuples_per_row; }
-    int row_byte_size() const { return _num_tuples_per_row * sizeof(Tuple*); }
-    MemPool* tuple_data_pool() { return &_tuple_data_pool; }
-    ObjectPool* agg_object_pool() { return &_agg_object_pool; }
-    int num_io_buffers() const { return _io_buffers.size(); }
-
-    // increase # of uncommitted rows
-    void increase_uncommitted_rows();
-
-    // Resets the row batch, returning all resources it has accumulated.
-    void reset();
-
-    // Add io buffer to this row batch.
-    void add_io_buffer(DiskIoMgr::BufferDescriptor* buffer);
-
-    // Add tuple stream to this row batch. The row batch takes ownership of the stream
-    // and will call Close() on the stream and delete it when freeing resources.
-    void add_tuple_stream(BufferedTupleStream2* stream);
-
-    /// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
-    /// The buffer's memory remains accounted against the original owner, even when the
-    /// ownership of batches is transferred. If the original owner wants the memory to be
-    /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources()
-    /// for further explanation).
-    /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and
-    /// make it consistent between buffers and I/O buffers.
-    void add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
-                    FlushMode flush);
-
-    // Called to indicate this row batch must be returned up the operator tree.
-    // This is used to control memory management for streaming rows.
-    // TODO: consider using this mechanism instead of add_io_buffer/add_tuple_stream. This is
-    // the property we need rather than meticulously passing resources up so the operator
-    // tree.
-    void mark_need_to_return() { _need_to_return = true; }
-
-    bool need_to_return() const { return _need_to_return; }
-
-    /// Used by an operator to indicate that it cannot produce more rows until the
-    /// resources that it has attached to the row batch are freed or acquired by an
-    /// ancestor operator. After this is called, the batch is at capacity and no more rows
-    /// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This
-    /// ensures that batches are flushed by streaming operators all the way up the operator
-    /// tree. Blocking operators can still accumulate batches with this flag.
-    /// TODO: IMPALA-3200: blocking operators should acquire all memory resources including
-    /// attached blocks/buffers, so that MarkFlushResources() can guarantee that the
-    /// resources will not be accounted against the original operator (this is currently
-    /// not true for Blocks, which can't be transferred).
-    void mark_flush_resources() {
-        DCHECK_LE(_num_rows, _capacity);
-        _capacity = _num_rows;
-        _flush = FlushMode::FLUSH_RESOURCES;
-    }
-
-    /// Called to indicate that some resources backing this batch were not attached and
-    /// will be cleaned up after the next GetNext() call. This means that the batch must
-    /// be returned up the operator tree. Blocking operators must deep-copy any rows from
-    /// this batch or preceding batches.
-    ///
-    /// This is a stronger version of MarkFlushResources(), because blocking operators
-    /// are not allowed to accumulate batches with the 'needs_deep_copy' flag.
-    /// TODO: IMPALA-4179: always attach backing resources and remove this flag.
-    void mark_needs_deep_copy() {
-        mark_flush_resources(); // No more rows should be added to the batch.
-        _needs_deep_copy = true;
-    }
-
-    bool needs_deep_copy() const { return _needs_deep_copy; }
-
-    // Transfer ownership of resources to dest.  This includes tuple data in mem
-    // pool and io buffers.
-    // we firstly update dest resource, and then reset current resource
-    void transfer_resource_ownership(RowBatch* dest);
-
-    void copy_row(TupleRow* src, TupleRow* dest) {
-        memcpy(dest, src, _num_tuples_per_row * sizeof(Tuple*));
-    }
-
-    // Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for exec
-    // nodes that skip an offset and copied more than necessary.
-    void copy_rows(int dest, int src, int num_rows) {
-        DCHECK_LE(dest, src);
-        DCHECK_LE(src + num_rows, _capacity);
-        memmove(_tuple_ptrs + _num_tuples_per_row * dest, _tuple_ptrs + _num_tuples_per_row * src,
-                num_rows * _num_tuples_per_row * sizeof(Tuple*));
-    }
-
-    void clear_row(TupleRow* row) { memset(row, 0, _num_tuples_per_row * sizeof(Tuple*)); }
-
-    // Acquires state from the 'src' row batch into this row batch. This includes all IO
-    // buffers and tuple data.
-    // This row batch must be empty and have the same row descriptor as the src batch.
-    // This is used for scan nodes which produce RowBatches asynchronously.  Typically,
-    // an ExecNode is handed a row batch to populate (pull model) but ScanNodes have
-    // multiple threads which push row batches.
-    // TODO: this is wasteful and makes a copy that's unnecessary.  Think about cleaning
-    // this up.
-    // TODO: rename this or unify with TransferResourceOwnership()
-    void acquire_state(RowBatch* src);
-
-    // Deep copy all rows this row batch into dst, using memory allocated from
-    // dst's _tuple_data_pool. Only valid when dst is empty.
-    // TODO: the current implementation of deep copy can produce an oversized
-    // row batch if there are duplicate tuples in this row batch.
-    void deep_copy_to(RowBatch* dst);
-
-    // Create a serialized version of this row batch in output_batch, attaching all of the
-    // data it references to output_batch.tuple_data. output_batch.tuple_data will be
-    // snappy-compressed unless the compressed data is larger than the uncompressed
-    // data. Use output_batch.is_compressed to determine whether tuple_data is compressed.
-    // If an in-flight row is present in this row batch, it is ignored.
-    // This function does not reset().
-    // Returns the uncompressed serialized size (this will be the true size of output_batch
-    // if tuple_data is actually uncompressed).
-    Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size,
-                     bool allow_transfer_large_data = false);
-
-    // Utility function: returns total size of batch.
-    static size_t get_batch_size(const PRowBatch& batch);
-
-    int num_rows() const { return _num_rows; }
-    int capacity() const { return _capacity; }
-
-    int num_buffers() const { return _buffers.size(); }
-
-    const RowDescriptor& row_desc() const { return _row_desc; }
-
-    // Max memory that this row batch can accumulate in _tuple_data_pool before it
-    // is considered at capacity.
-    /// This is a soft capacity: row batches may exceed the capacity, preferably only by a
-    /// row's worth of data.
-    static const int AT_CAPACITY_MEM_USAGE;
-
-    // Max memory out of AT_CAPACITY_MEM_USAGE that should be used for fixed-length data,
-    // in order to leave room for variable-length data.
-    static const int FIXED_LEN_BUFFER_LIMIT;
-
-    /// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in
-    /// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would
-    /// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row.
-    /// Returns Status::MemoryLimitExceeded("Memory limit exceeded") and sets 'buffer' to nullptr if a memory limit would
-    /// have been exceeded. 'state' is used to log the error.
-    /// On success, sets 'buffer_size' to the size in bytes and 'buffer' to the buffer.
-    Status resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* buffer_size,
-                                            uint8_t** buffer);
-
-    void set_scanner_id(int id) { _scanner_id = id; }
-    int scanner_id() const { return _scanner_id; }
-
-    static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024;
-    std::string to_string();
-
-private:
-    // All members need to be handled in RowBatch::swap()
-
-    bool _has_in_flight_row;   // if true, last row hasn't been committed yet
-    int _num_rows;             // # of committed rows
-    int _num_uncommitted_rows; // # of uncommited rows in row batch mem pool
-    int _capacity;             // maximum # of rows
-
-    /// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
-    /// acquired by a new owner as soon as possible. See MarkFlushResources(). If
-    /// FLUSH_RESOURCES, AtCapacity() is also true.
-    FlushMode _flush;
-
-    /// If true, this batch references unowned memory that will be cleaned up soon.
-    /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
-    /// AtCapacity() is true.
-    bool _needs_deep_copy;
-
-    int _num_tuples_per_row;
-    RowDescriptor _row_desc;
-
-    // Array of pointers with _capacity * _num_tuples_per_row elements.
-    // The memory ownership depends on whether legacy joins and aggs are enabled.
-    //
-    // Memory is malloc'd and owned by RowBatch:
-    // If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true
-    // then the memory is owned by this RowBatch and is freed upon its destruction.
-    // This mode is more performant especially with SubplanNodes in the ExecNode tree
-    // because the tuple pointers are not transferred and do not have to be re-created
-    // in every Reset().
-    //
-    // Memory is allocated from MemPool:
-    // Otherwise, the memory is allocated from _tuple_data_pool. As a result, the
-    // pointer memory is transferred just like tuple data, and must be re-created
-    // in Reset(). This mode is required for the legacy join and agg which rely on
-    // the tuple pointers being allocated from the _tuple_data_pool, so they can
-    // acquire ownership of the tuple pointers.
-    Tuple** _tuple_ptrs;
-    int _tuple_ptrs_size;
-
-    // Sum of all auxiliary bytes. This includes IoBuffers and memory from
-    // TransferResourceOwnership().
-    int64_t _auxiliary_mem_usage;
-
-    // If true, this batch is considered at capacity. This is explicitly set by streaming
-    // components that return rows via row batches.
-    bool _need_to_return;
-
-    // holding (some of the) data referenced by rows
-    MemPool _tuple_data_pool;
-
-    // holding some complex agg object data (bitmap, hll)
-    ObjectPool _agg_object_pool;
-
-    // IO buffers current owned by this row batch. Ownership of IO buffers transfer
-    // between row batches. Any IO buffer will be owned by at most one row batch
-    // (i.e. they are not ref counted) so most row batches don't own any.
-    std::vector<DiskIoMgr::BufferDescriptor*> _io_buffers;
-
-    struct BufferInfo {
-        BufferPool::ClientHandle* client;
-        BufferPool::BufferHandle buffer;
-    };
-    /// Pages attached to this row batch. See AddBuffer() for ownership semantics.
-    std::vector<BufferInfo> _buffers;
-
-    // String to write compressed tuple data to in serialize().
-    // This is a string so we can swap() with the string in the PRowBatch we're serializing
-    // to (we don't compress directly into the PRowBatch in case the compressed data is
-    // longer than the uncompressed data). Swapping avoids copying data to the PRowBatch and
-    // avoids excess memory allocations: since we reuse RowBatches and PRowBatchs, and
-    // assuming all row batches are roughly the same size, all strings will eventually be
-    // allocated to the right size.
-    std::string _compression_scratch;
-
-    int _scanner_id;
-    bool _cleared = false;
-};
-
-/// Macros for iterating through '_row_batch', starting at '_start_row_idx'.
-/// '_row_batch' is the row batch to iterate through.
-/// '_start_row_idx' is the starting row index.
-/// '_iter' is the iterator.
-/// '_limit' is the max number of rows to iterate over.
-#define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \
-    for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); !_iter.at_end(); _iter.next())
-
-#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter)                    \
-    for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit); !_iter.at_end(); \
-         _iter.next())
-
-} // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 089b5ba4be..23045b997a 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -22,7 +22,6 @@
 #include "olap/memtable.h"
 #include "olap/storage_engine.h"
 #include "runtime/load_channel.h"
-#include "runtime/row_batch.h"
 #include "util/doris_metrics.h"
 
 namespace doris {
diff --git a/be/src/util/arrow/block_convertor.h b/be/src/util/arrow/block_convertor.h
index bc4aae037e..c76ae73c9f 100644
--- a/be/src/util/arrow/block_convertor.h
+++ b/be/src/util/arrow/block_convertor.h
@@ -22,8 +22,8 @@
 #include "common/status.h"
 #include "vec/core/block.h"
 
-// This file will convert Doris RowBatch to/from Arrow's RecordBatch
-// RowBatch is used by Doris query engine to exchange data between
+// This file will convert Doris Block to/from Arrow's RecordBatch
+// Block is used by Doris query engine to exchange data between
 // each execute node.
 
 namespace arrow {
@@ -37,7 +37,6 @@ class Schema;
 namespace doris {
 
 class ObjectPool;
-class RowBatch;
 class RowDescriptor;
 
 Status convert_to_arrow_batch(const vectorized::Block& block,
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index 46eca71b6a..01b930b52e 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -40,7 +40,6 @@
 #include "runtime/descriptor_helper.h"
 #include "runtime/descriptors.h"
 #include "runtime/large_int_value.h"
-#include "runtime/row_batch.h"
 #include "util/arrow/utils.h"
 #include "util/types.h"
 
@@ -120,349 +119,6 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc,
     return Status::OK();
 }
 
-Status convert_to_doris_type(const arrow::DataType& type, TSlotDescriptorBuilder* builder) {
-    switch (type.id()) {
-    case arrow::Type::INT8:
-        builder->type(TYPE_TINYINT);
-        break;
-    case arrow::Type::INT16:
-        builder->type(TYPE_SMALLINT);
-        break;
-    case arrow::Type::INT32:
-        builder->type(TYPE_INT);
-        break;
-    case arrow::Type::INT64:
-        builder->type(TYPE_BIGINT);
-        break;
-    case arrow::Type::FLOAT:
-        builder->type(TYPE_FLOAT);
-        break;
-    case arrow::Type::DOUBLE:
-        builder->type(TYPE_DOUBLE);
-        break;
-    case arrow::Type::BOOL:
-        builder->type(TYPE_BOOLEAN);
-        break;
-    default:
-        return Status::InvalidArgument("Unknown arrow type id({})", type.id());
-    }
-    return Status::OK();
-}
-
-Status convert_to_slot_desc(const arrow::Field& field, int column_pos,
-                            TSlotDescriptorBuilder* builder) {
-    RETURN_IF_ERROR(convert_to_doris_type(*field.type(), builder));
-    builder->column_name(field.name()).nullable(field.nullable()).column_pos(column_pos);
-    return Status::OK();
-}
-
-Status convert_to_row_desc(ObjectPool* pool, const arrow::Schema& schema,
-                           RowDescriptor** row_desc) {
-    TDescriptorTableBuilder builder;
-    TTupleDescriptorBuilder tuple_builder;
-    for (int i = 0; i < schema.num_fields(); ++i) {
-        auto field = schema.field(i);
-        TSlotDescriptorBuilder slot_builder;
-        RETURN_IF_ERROR(convert_to_slot_desc(*field, i, &slot_builder));
-        tuple_builder.add_slot(slot_builder.build());
-    }
-    tuple_builder.build(&builder);
-    DescriptorTbl* tbl = nullptr;
-    RETURN_IF_ERROR(DescriptorTbl::create(pool, builder.desc_tbl(), &tbl));
-    auto tuple_desc = tbl->get_tuple_descriptor(0);
-    *row_desc = pool->add(new RowDescriptor(tuple_desc, false));
-    return Status::OK();
-}
-
-// Convert RowBatch to an Arrow::Array
-// We should keep this function to keep compatible with arrow's type visitor
-// Now we inherit TypeVisitor to use default Visit implementation
-class FromRowBatchConverter : public arrow::TypeVisitor {
-public:
-    FromRowBatchConverter(const RowBatch& batch, const std::shared_ptr<arrow::Schema>& schema,
-                          arrow::MemoryPool* pool)
-            : _batch(batch), _schema(schema), _pool(pool), _cur_field_idx(-1) {
-        // obtain local time zone
-        time_t ts = 0;
-        struct tm t;
-        char buf[16];
-        localtime_r(&ts, &t);
-        strftime(buf, sizeof(buf), "%Z", &t);
-        _time_zone = buf;
-    }
-
-    ~FromRowBatchConverter() override {}
-
-    // Use base class function
-    using arrow::TypeVisitor::Visit;
-
-#define PRIMITIVE_VISIT(TYPE) \
-    arrow::Status Visit(const arrow::TYPE& type) override { return _visit(type); }
-
-    PRIMITIVE_VISIT(Int8Type);
-    PRIMITIVE_VISIT(Int16Type);
-    PRIMITIVE_VISIT(Int32Type);
-    PRIMITIVE_VISIT(Int64Type);
-    PRIMITIVE_VISIT(FloatType);
-    PRIMITIVE_VISIT(DoubleType);
-
-#undef PRIMITIVE_VISIT
-
-    // process string-transformable field
-    arrow::Status Visit(const arrow::StringType& type) override {
-        arrow::StringBuilder builder(_pool);
-        size_t num_rows = _batch.num_rows();
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = 0; i < num_rows; ++i) {
-            bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
-            PrimitiveType primitive_type = _cur_slot_ref->type().type;
-            switch (primitive_type) {
-            case TYPE_VARCHAR:
-            case TYPE_CHAR:
-            case TYPE_HLL:
-            case TYPE_STRING: {
-                const StringValue* string_val = (const StringValue*)(cell_ptr);
-                if (string_val->len == 0) {
-                    // 0x01 is a magic num, not useful actually, just for present ""
-                    //char* tmp_val = reinterpret_cast<char*>(0x01);
-                    ARROW_RETURN_NOT_OK(builder.Append(""));
-                } else {
-                    ARROW_RETURN_NOT_OK(builder.Append(string_val->ptr, string_val->len));
-                }
-                break;
-            }
-            case TYPE_DATE:
-            case TYPE_DATETIME: {
-                char buf[64];
-                const DateTimeValue* time_val = (const DateTimeValue*)(cell_ptr);
-                int len = time_val->to_buffer(buf);
-                ARROW_RETURN_NOT_OK(builder.Append(buf, len));
-                break;
-            }
-            case TYPE_LARGEINT: {
-                auto string_temp = LargeIntValue::to_string(
-                        reinterpret_cast<const PackedInt128*>(cell_ptr)->value);
-                ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), string_temp.size()));
-                break;
-            }
-            default: {
-                LOG(WARNING) << "can't convert this type = " << primitive_type << "to arrow type";
-                return arrow::Status::TypeError("unsupported column type");
-            }
-            }
-        }
-        return builder.Finish(&_arrays[_cur_field_idx]);
-    }
-
-    // process doris DecimalV2
-    arrow::Status Visit(const arrow::Decimal128Type& type) override {
-        std::shared_ptr<arrow::DataType> s_decimal_ptr =
-                std::make_shared<arrow::Decimal128Type>(27, 9);
-        arrow::Decimal128Builder builder(s_decimal_ptr, _pool);
-        size_t num_rows = _batch.num_rows();
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = 0; i < num_rows; ++i) {
-            bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
-            PackedInt128* p_value = reinterpret_cast<PackedInt128*>(cell_ptr);
-            int64_t high = (p_value->value) >> 64;
-            uint64 low = p_value->value;
-            arrow::Decimal128 value(high, low);
-            ARROW_RETURN_NOT_OK(builder.Append(value));
-        }
-        return builder.Finish(&_arrays[_cur_field_idx]);
-    }
-    // process boolean
-    arrow::Status Visit(const arrow::BooleanType& type) override {
-        arrow::BooleanBuilder builder(_pool);
-        size_t num_rows = _batch.num_rows();
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = 0; i < num_rows; ++i) {
-            bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
-            ARROW_RETURN_NOT_OK(builder.Append(*(bool*)cell_ptr));
-        }
-        return builder.Finish(&_arrays[_cur_field_idx]);
-    }
-
-    Status convert(std::shared_ptr<arrow::RecordBatch>* out);
-
-private:
-    template <typename T>
-    typename std::enable_if<std::is_base_of<arrow::PrimitiveCType, T>::value, arrow::Status>::type
-    _visit(const T& type) {
-        arrow::NumericBuilder<T> builder(_pool);
-
-        size_t num_rows = _batch.num_rows();
-        ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
-        for (size_t i = 0; i < num_rows; ++i) {
-            bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i));
-            if (is_null) {
-                ARROW_RETURN_NOT_OK(builder.AppendNull());
-                continue;
-            }
-            auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i));
-            ARROW_RETURN_NOT_OK(builder.Append(*(typename T::c_type*)cell_ptr));
-        }
-        return builder.Finish(&_arrays[_cur_field_idx]);
-    }
-
-private:
-    const RowBatch& _batch;
-    const std::shared_ptr<arrow::Schema>& _schema;
-    arrow::MemoryPool* _pool;
-
-    size_t _cur_field_idx;
-    std::unique_ptr<SlotRef> _cur_slot_ref;
-
-    std::string _time_zone;
-
-    std::vector<std::shared_ptr<arrow::Array>> _arrays;
-};
-
-Status FromRowBatchConverter::convert(std::shared_ptr<arrow::RecordBatch>* out) {
-    std::vector<SlotDescriptor*> slot_descs;
-    for (auto tuple_desc : _batch.row_desc().tuple_descriptors()) {
-        for (auto desc : tuple_desc->slots()) {
-            slot_descs.push_back(desc);
-        }
-    }
-    size_t num_fields = _schema->num_fields();
-    if (slot_descs.size() != num_fields) {
-        return Status::InvalidArgument("number fields not match");
-    }
-
-    _arrays.resize(num_fields);
-
-    for (size_t idx = 0; idx < num_fields; ++idx) {
-        _cur_field_idx = idx;
-        _cur_slot_ref.reset(new SlotRef(slot_descs[idx]));
-        RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _batch.row_desc()));
-        auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this);
-        if (!arrow_st.ok()) {
-            return to_status(arrow_st);
-        }
-    }
-    *out = arrow::RecordBatch::Make(_schema, _batch.num_rows(), std::move(_arrays));
-    return Status::OK();
-}
-
-Status convert_to_arrow_batch(const RowBatch& batch, const std::shared_ptr<arrow::Schema>& schema,
-                              arrow::MemoryPool* pool,
-                              std::shared_ptr<arrow::RecordBatch>* result) {
-    FromRowBatchConverter converter(batch, schema, pool);
-    return converter.convert(result);
-}
-
-// Convert Arrow Array to RowBatch
-class ToRowBatchConverter : public arrow::ArrayVisitor {
-public:
-    using arrow::ArrayVisitor::Visit;
-
-    ToRowBatchConverter(const arrow::RecordBatch& batch, const RowDescriptor& row_desc)
-            : _batch(batch), _row_desc(row_desc) {}
-
-#define PRIMITIVE_VISIT(TYPE) \
-    arrow::Status Visit(const arrow::TYPE& array) override { return _visit(array); }
-
-    PRIMITIVE_VISIT(Int8Array);
-    PRIMITIVE_VISIT(Int16Array);
-    PRIMITIVE_VISIT(Int32Array);
-    PRIMITIVE_VISIT(Int64Array);
-    PRIMITIVE_VISIT(FloatArray);
-    PRIMITIVE_VISIT(DoubleArray);
-
-#undef PRIMITIVE_VISIT
-
-    // Convert to a RowBatch
-    Status convert(std::shared_ptr<RowBatch>* result);
-
-private:
-    template <typename T>
-    typename std::enable_if<std::is_base_of<arrow::PrimitiveCType, typename T::TypeClass>::value,
-                            arrow::Status>::type
-    _visit(const T& array) {
-        auto raw_values = array.raw_values();
-        for (size_t i = 0; i < array.length(); ++i) {
-            auto row = _output->get_row(i);
-            auto tuple = _cur_slot_ref->get_tuple(row);
-            if (array.IsValid(i)) {
-                tuple->set_not_null(_cur_slot_ref->null_indicator_offset());
-                auto slot = _cur_slot_ref->get_slot(row);
-                *(typename T::TypeClass::c_type*)slot = raw_values[i];
-            } else {
-                tuple->set_null(_cur_slot_ref->null_indicator_offset());
-            }
-        }
-        return arrow::Status::OK();
-    }
-
-private:
-    const arrow::RecordBatch& _batch;
-    const RowDescriptor& _row_desc;
-
-    std::unique_ptr<SlotRef> _cur_slot_ref;
-    std::shared_ptr<RowBatch> _output;
-};
-
-Status ToRowBatchConverter::convert(std::shared_ptr<RowBatch>* result) {
-    std::vector<SlotDescriptor*> slot_descs;
-    for (auto tuple_desc : _row_desc.tuple_descriptors()) {
-        for (auto desc : tuple_desc->slots()) {
-            slot_descs.push_back(desc);
-        }
-    }
-    size_t num_fields = slot_descs.size();
-    if (num_fields != _batch.schema()->num_fields()) {
-        return Status::InvalidArgument("Schema not match");
-    }
-    // TODO(zc): check if field type match
-
-    size_t num_rows = _batch.num_rows();
-    _output.reset(new RowBatch(_row_desc, num_rows));
-    _output->commit_rows(num_rows);
-    auto pool = _output->tuple_data_pool();
-    for (size_t row_id = 0; row_id < num_rows; ++row_id) {
-        auto row = _output->get_row(row_id);
-        for (int tuple_id = 0; tuple_id < _row_desc.tuple_descriptors().size(); ++tuple_id) {
-            auto tuple_desc = _row_desc.tuple_descriptors()[tuple_id];
-            auto tuple = pool->allocate(tuple_desc->byte_size());
-            row->set_tuple(tuple_id, (Tuple*)tuple);
-        }
-    }
-    for (size_t idx = 0; idx < num_fields; ++idx) {
-        _cur_slot_ref.reset(new SlotRef(slot_descs[idx]));
-        RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _row_desc));
-        auto arrow_st = arrow::VisitArrayInline(*_batch.column(idx), this);
-        if (!arrow_st.ok()) {
-            return to_status(arrow_st);
-        }
-    }
-
-    *result = std::move(_output);
-
-    return Status::OK();
-}
-
-Status convert_to_row_batch(const arrow::RecordBatch& batch, const RowDescriptor& row_desc,
-                            std::shared_ptr<RowBatch>* result) {
-    ToRowBatchConverter converter(batch, row_desc);
-    return converter.convert(result);
-}
-
 Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result) {
     // create sink memory buffer outputstream with the computed capacity
     int64_t capacity;
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index f75b060502..25203ec170 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -36,29 +36,12 @@ class Schema;
 namespace doris {
 
 class ObjectPool;
-class RowBatch;
 class RowDescriptor;
 
 // Convert Doris RowDescriptor to Arrow Schema.
 Status convert_to_arrow_schema(const RowDescriptor& row_desc,
                                std::shared_ptr<arrow::Schema>* result);
 
-// Convert an Arrow Schema to a Doris RowDescriptor which will be add to
-// input pool.
-// Why we should
-Status convert_to_row_desc(ObjectPool* pool, const arrow::Schema& schema, RowDescriptor** row_desc);
-
-// Convert a Doris RowBatch to an Arrow RecordBatch. A valid Arrow Schema
-// who should match RowBatch's schema is given. Memory used by result RecordBatch
-// will be allocated from input pool.
-Status convert_to_arrow_batch(const RowBatch& batch, const std::shared_ptr<arrow::Schema>& schema,
-                              arrow::MemoryPool* pool, std::shared_ptr<arrow::RecordBatch>* result);
-
-// Convert an Arrow RecordBatch to a Doris RowBatch. A valid RowDescriptor
-// whose schema is the same with RecordBatch's should be given.
-Status convert_to_row_batch(const arrow::RecordBatch& batch, const RowDescriptor& row_desc,
-                            std::shared_ptr<RowBatch>* result);
-
 Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result);
 
 } // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 817a2ac643..ebe46fdd0c 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -26,7 +26,6 @@
 #include "agent/be_exec_version_manager.h"
 #include "common/status.h"
 #include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple_row.h"
 #include "udf/udf.h"
diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
index 33a179c3ae..d8107ba925 100644
--- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
+++ b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp
@@ -21,7 +21,6 @@
 
 #include "exec/exec_node.h"
 #include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7125149494..39ae99a36e 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -25,7 +25,6 @@
 #include "exprs/expr.h"
 #include "exprs/runtime_filter_slots_cross.h"
 #include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/simd/bits.h"
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 2e3330f0a5..ecf4d87081 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -21,7 +21,6 @@
 
 #include "exec/exec_node.h"
 #include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_string.h"
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index acd8fafe53..b2235de9ab 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -18,7 +18,6 @@
 #include "vec/exec/vanalytic_eval_node.h"
 
 #include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "vec/exprs/vexpr.h"
 
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp
index ae3d7d0986..e91626d49c 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -21,7 +21,6 @@
 
 #include "common/status.h"
 #include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp
index e0f520f6ae..3fb03bd19c 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -20,7 +20,6 @@
 #include "exec/text_converter.h"
 #include "exec/text_converter.hpp"
 #include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp
index 1bdd5c8563..9e3e6a1c42 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -20,7 +20,6 @@
 #include "exec/text_converter.h"
 #include "exec/text_converter.hpp"
 #include "gen_cpp/PlanNodes_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h
index 1e339e3a80..ad9cbab7e7 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -18,7 +18,6 @@
 #pragma once
 
 #include "exec/exec_node.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "vec/core/materialize_block.h"
 #include "vec/exec/join/join_op.h"
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 97916632a9..4ebdd05506 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -19,7 +19,6 @@
 
 #include "common/config.h"
 #include "pipeline/pipeline.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
 #include "vec/common/sort/heap_sorter.h"
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp
index 21b7a0f521..17f87aac2e 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -20,7 +20,6 @@
 #include <vector>
 
 #include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
 #include "util/debug_util.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index a25683e6da..2c8f79de48 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -21,7 +21,6 @@
 #include "runtime/buffer_control_block.h"
 #include "runtime/exec_env.h"
 #include "runtime/result_buffer_mgr.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/uid_util.h"
 #include "vec/exprs/vexpr.h"
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 03a0181e55..7f756aff83 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -27,7 +27,6 @@
 #include "exprs/expr_context.h"
 #include "olap/hll.h"
 #include "runtime/exec_env.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "runtime/tuple_row.h"
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index c299fdca55..b62544cb7c 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -33,7 +33,6 @@
 #include "exprs/expr_context.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
-#include "runtime/row_batch.h"
 #include "runtime/thread_context.h"
 #include "util/bitmap.h"
 #include "util/countdown_latch.h"
diff --git a/be/test/exprs/binary_predicate_test.cpp b/be/test/exprs/binary_predicate_test.cpp
index a53d3754fc..af96e332cb 100644
--- a/be/test/exprs/binary_predicate_test.cpp
+++ b/be/test/exprs/binary_predicate_test.cpp
@@ -24,7 +24,6 @@
 #include "exprs/expr.h"
 #include "exprs/int_literal.h"
 #include "gen_cpp/Exprs_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
 
diff --git a/be/test/exprs/in_op_test.cpp b/be/test/exprs/in_op_test.cpp
index c1de71f4e9..fc49b4dd5b 100644
--- a/be/test/exprs/in_op_test.cpp
+++ b/be/test/exprs/in_op_test.cpp
@@ -23,7 +23,6 @@
 #include "exprs/in_predicate.h"
 #include "exprs/int_literal.h"
 #include "gen_cpp/Exprs_types.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/debug_util.h"
 
diff --git a/be/test/runtime/data_spliter_test.cpp b/be/test/runtime/data_spliter_test.cpp
index 1f0ab50c5c..b497978e3b 100644
--- a/be/test/runtime/data_spliter_test.cpp
+++ b/be/test/runtime/data_spliter_test.cpp
@@ -26,7 +26,6 @@
 #include "olap/olap_main.cpp"
 #include "runtime/descriptors.h"
 #include "runtime/dpp_sink_internal.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple_row.h"
diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp
index 55939cc04f..3dfc8b0a37 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -22,7 +22,6 @@
 #include "common/config.h"
 #include "exec/data_sink.h"
 #include "runtime/plan_fragment_executor.h"
-#include "runtime/row_batch.h"
 
 namespace doris {
 
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 51a743e738..a4de5e9129 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -26,7 +26,6 @@
 #include "agent/be_exec_version_manager.h"
 #include "exec/schema_scanner.h"
 #include "gen_cpp/data.pb.h"
-#include "runtime/row_batch.h"
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
 #include "vec/columns/column_array.h"
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
index 9149fb3dd7..a4d7a6c5d5 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -32,7 +32,6 @@
 #include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "runtime/user_function_cache.h"
diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp
index 39e3bb56ed..d47052efce 100644
--- a/be/test/vec/exec/vorc_scanner_test.cpp
+++ b/be/test/vec/exec/vorc_scanner_test.cpp
@@ -32,7 +32,6 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "runtime/user_function_cache.h"
diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp
index c08a69a005..49b3ba3c56 100644
--- a/be/test/vec/exec/vparquet_scanner_test.cpp
+++ b/be/test/vec/exec/vparquet_scanner_test.cpp
@@ -28,7 +28,6 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "runtime/user_function_cache.h"
diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp
index edea60aa12..8eb195535e 100644
--- a/be/test/vec/exprs/vexpr_test.cpp
+++ b/be/test/vec/exprs/vexpr_test.cpp
@@ -30,7 +30,6 @@
 #include "runtime/large_int_value.h"
 #include "runtime/memory/chunk_allocator.h"
 #include "runtime/primitive_type.h"
-#include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 #include "runtime/tuple_row.h"
@@ -48,25 +47,12 @@ TEST(TEST_VEXPR, ABSTEST) {
 
     auto tuple_desc = const_cast<doris::TupleDescriptor*>(desc_tbl->get_tuple_descriptor(0));
     doris::RowDescriptor row_desc(tuple_desc, false);
-    doris::RowBatch row_batch(row_desc, 1024);
     std::string expr_json =
             R"|({"1":{"lst":["rec",2,{"1":{"i32":20},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"4":{"i32":1},"20":{"i32":-1},"26":{"rec":{"1":{"rec":{"2":{"str":"abs"}}},"2":{"i32":0},"3":{"lst":["rec",1,{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}]},"4":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"5":{"tf":0},"7":{"str":"abs(INT)"},"9":{"rec":{"1":{"str":"_ZN5doris13MathFunctions3absEPN9doris_ud [...]
     doris::TExpr exprx = apache::thrift::from_json_string<doris::TExpr>(expr_json);
     doris::vectorized::VExprContext* context = nullptr;
     doris::vectorized::VExpr::create_expr_tree(&object_pool, exprx, &context);
 
-    int32_t k1 = -100;
-    for (int i = 0; i < 1024; ++i, k1++) {
-        auto idx = row_batch.add_row();
-        doris::TupleRow* tuple_row = row_batch.get_row(idx);
-        auto tuple =
-                (doris::Tuple*)(row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()));
-        auto slot_desc = tuple_desc->slots()[0];
-        memcpy(tuple->get_slot(slot_desc->tuple_offset()), &k1, slot_desc->slot_size());
-        tuple_row->set_tuple(0, tuple);
-        row_batch.commit_last_row();
-    }
-
     doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(),
                                      doris::TQueryGlobals(), nullptr);
     runtime_stat.init_mem_trackers();
@@ -85,7 +71,6 @@ TEST(TEST_VEXPR, ABSTEST2) {
     schema_scanner.init(&param, &object_pool);
     auto tuple_desc = const_cast<TupleDescriptor*>(schema_scanner.tuple_desc());
     RowDescriptor row_desc(tuple_desc, false);
-    RowBatch row_batch(row_desc, 1024);
     std::string expr_json =
             R"|({"1":{"lst":["rec",2,{"1":{"i32":20},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"4":{"i32":1},"20":{"i32":-1},"26":{"rec":{"1":{"rec":{"2":{"str":"abs"}}},"2":{"i32":0},"3":{"lst":["rec",1,{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}]},"4":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"5":{"tf":0},"7":{"str":"abs(INT)"},"9":{"rec":{"1":{"str":"_ZN5doris13MathFunctions3absEPN9doris_ud [...]
     TExpr exprx = apache::thrift::from_json_string<TExpr>(expr_json);
@@ -93,18 +78,6 @@ TEST(TEST_VEXPR, ABSTEST2) {
     doris::vectorized::VExprContext* context = nullptr;
     doris::vectorized::VExpr::create_expr_tree(&object_pool, exprx, &context);
 
-    int32_t k1 = -100;
-    for (int i = 0; i < 1024; ++i, k1++) {
-        auto idx = row_batch.add_row();
-        doris::TupleRow* tuple_row = row_batch.get_row(idx);
-        auto tuple =
-                (doris::Tuple*)(row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size()));
-        auto slot_desc = tuple_desc->slots()[0];
-        memcpy(tuple->get_slot(slot_desc->tuple_offset()), &k1, slot_desc->slot_size());
-        tuple_row->set_tuple(0, tuple);
-        row_batch.commit_last_row();
-    }
-
     doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(),
                                      doris::TQueryGlobals(), nullptr);
     runtime_stat.init_mem_trackers();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org