You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/19 06:25:38 UTC

[doris] branch master updated: [refactor](new-scan) remove old file scan node (#13433)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5423de68dd [refactor](new-scan) remove old file scan node (#13433)
5423de68dd is described below

commit 5423de68dd9c7bf480823ea0a0faca84c8fd3f82
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Oct 19 14:25:32 2022 +0800

    [refactor](new-scan) remove old file scan node (#13433)
    
    All these files are not used anymore, can be removed.
---
 be/src/common/config.h                             |   6 -
 be/src/exec/exec_node.cpp                          |   7 +-
 be/src/vec/CMakeLists.txt                          |   7 -
 be/src/vec/exec/file_arrow_scanner.cpp             | 259 -----------
 be/src/vec/exec/file_arrow_scanner.h               | 118 -----
 be/src/vec/exec/file_scan_node.cpp                 | 508 ---------------------
 be/src/vec/exec/file_scan_node.h                   | 148 ------
 be/src/vec/exec/file_scanner.cpp                   | 199 --------
 be/src/vec/exec/file_scanner.h                     | 112 -----
 be/src/vec/exec/file_text_scanner.cpp              | 294 ------------
 be/src/vec/exec/file_text_scanner.h                |  72 ---
 be/src/vec/exec/scan/new_file_arrow_scanner.cpp    | 265 -----------
 be/src/vec/exec/scan/new_file_arrow_scanner.h      |  89 ----
 be/src/vec/exec/scan/new_file_scan_node.cpp        |  27 +-
 be/src/vec/exec/scan/new_file_scanner.cpp          | 317 -------------
 be/src/vec/exec/scan/new_file_scanner.h            | 100 ----
 be/src/vec/exec/scan/new_file_text_scanner.cpp     | 263 -----------
 be/src/vec/exec/scan/new_file_text_scanner.h       |  66 ---
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +-
 .../load_p0/broker_load/test_broker_load.groovy    |  28 +-
 20 files changed, 18 insertions(+), 2869 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 046ad3b234..ecbbd7b64d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -864,12 +864,6 @@ CONF_mInt64(nodechannel_pending_queue_max_bytes, "67108864");
 // so as to avoid occupying the execution thread for a long time.
 CONF_mInt32(max_fragment_start_wait_time_seconds, "30");
 
-// Temp config. True to use new file scan node to do load job. Will remove after fully test.
-CONF_mBool(enable_new_load_scan_node, "false");
-
-// Temp config. True to use new file scanner. Will remove after fully test.
-CONF_mBool(enable_new_file_scanner, "false");
-
 // Hide webserver page for safety.
 // Hide the be config page for webserver.
 CONF_Bool(hide_webserver_config_page, "false");
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 39e917ec6b..f04f832328 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -59,7 +59,6 @@
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
-#include "vec/exec/file_scan_node.h"
 #include "vec/exec/join/vhash_join_node.h"
 #include "vec/exec/scan/new_es_scan_node.h"
 #include "vec/exec/scan/new_file_scan_node.h"
@@ -613,13 +612,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
         return Status::OK();
 
     case TPlanNodeType::FILE_SCAN_NODE:
-        //        *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
-        if (config::enable_new_scan_node) {
+        if (state->enable_vectorized_exec()) {
             *node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs));
         } else {
-            *node = pool->add(new vectorized::FileScanNode(pool, tnode, descs));
+            return Status::InternalError("Not support file scan node in non-vec engine");
         }
-
         return Status::OK();
 
     case TPlanNodeType::REPEAT_NODE:
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 58a9701c90..d5739e25b4 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -230,10 +230,6 @@ set(VEC_FILES
   runtime/vorc_writer.cpp
   utils/arrow_column_to_doris_column.cpp
   runtime/vsorted_run_merger.cpp
-  exec/file_arrow_scanner.cpp
-  exec/file_scanner.cpp
-  exec/file_scan_node.cpp
-  exec/file_text_scanner.cpp
   exec/format/parquet/vparquet_column_chunk_reader.cpp
   exec/format/parquet/vparquet_group_reader.cpp
   exec/format/parquet/vparquet_page_index.cpp
@@ -250,10 +246,7 @@ set(VEC_FILES
   exec/scan/scanner_scheduler.cpp
   exec/scan/new_olap_scan_node.cpp
   exec/scan/new_olap_scanner.cpp
-  exec/scan/new_file_arrow_scanner.cpp
   exec/scan/new_file_scan_node.cpp
-  exec/scan/new_file_scanner.cpp
-  exec/scan/new_file_text_scanner.cpp
   exec/scan/vfile_scanner.cpp
   exec/scan/new_odbc_scanner.cpp
   exec/scan/new_odbc_scan_node.cpp
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
deleted file mode 100644
index 3a5d6c2f5b..0000000000
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ /dev/null
@@ -1,259 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/file_arrow_scanner.h"
-
-#include <memory>
-
-#include "exec/arrow/parquet_reader.h"
-#include "io/buffered_reader.h"
-#include "io/file_factory.h"
-#include "io/hdfs_reader_writer.h"
-#include "runtime/descriptors.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
-
-namespace doris::vectorized {
-
-FileArrowScanner::FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
-                                   const TFileScanRangeParams& params,
-                                   const std::vector<TFileRangeDesc>& ranges,
-                                   const std::vector<TExpr>& pre_filter_texprs,
-                                   ScannerCounter* counter)
-        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
-          _cur_file_reader(nullptr),
-          _cur_file_eof(false),
-          _batch(nullptr),
-          _arrow_batch_cur_idx(0) {
-    _convert_arrow_block_timer = ADD_TIMER(_profile, "ConvertArrowBlockTimer");
-}
-
-FileArrowScanner::~FileArrowScanner() {
-    FileArrowScanner::close();
-}
-
-Status FileArrowScanner::_open_next_reader() {
-    // open_file_reader
-    if (_cur_file_reader != nullptr) {
-        delete _cur_file_reader;
-        _cur_file_reader = nullptr;
-    }
-
-    while (true) {
-        if (_next_range >= _ranges.size()) {
-            _scanner_eof = true;
-            return Status::OK();
-        }
-        const TFileRangeDesc& range = _ranges[_next_range++];
-        std::unique_ptr<FileReader> file_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
-                                                        range.start_offset, range.file_size, 0,
-                                                        file_reader));
-        RETURN_IF_ERROR(file_reader->open());
-        if (file_reader->size() == 0) {
-            file_reader->close();
-            continue;
-        }
-
-        int32_t num_of_columns_from_file = _file_slot_descs.size();
-
-        _cur_file_reader =
-                _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file,
-                                  range.start_offset, range.size);
-
-        auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
-        Status status =
-                _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone());
-        if (status.is_end_of_file()) {
-            continue;
-        } else {
-            if (!status.ok()) {
-                std::stringstream ss;
-                ss << " file: " << range.path << " error:" << status.get_error_msg();
-                return Status::InternalError(ss.str());
-            } else {
-                _update_profile(_cur_file_reader->statistics());
-                return status;
-            }
-        }
-    }
-}
-
-Status FileArrowScanner::open() {
-    RETURN_IF_ERROR(FileScanner::open());
-    if (_ranges.empty()) {
-        return Status::OK();
-    }
-    return Status::OK();
-}
-
-// get next available arrow batch
-Status FileArrowScanner::_next_arrow_batch() {
-    _arrow_batch_cur_idx = 0;
-    // first, init file reader
-    if (_cur_file_reader == nullptr || _cur_file_eof) {
-        RETURN_IF_ERROR(_open_next_reader());
-        _cur_file_eof = false;
-    }
-    // second, loop until find available arrow batch or EOF
-    while (!_scanner_eof) {
-        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
-        if (_cur_file_eof) {
-            RETURN_IF_ERROR(_open_next_reader());
-            _cur_file_eof = false;
-            continue;
-        }
-        if (_batch->num_rows() == 0) {
-            continue;
-        }
-        return Status::OK();
-    }
-    return Status::EndOfFile("EOF");
-}
-
-Status FileArrowScanner::_init_arrow_batch_if_necessary() {
-    // 1. init batch if first time
-    // 2. reset reader if end of file
-    Status status = Status::OK();
-    if (_scanner_eof) {
-        return Status::EndOfFile("EOF");
-    }
-    if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
-        return _next_arrow_batch();
-    }
-    return status;
-}
-
-Status FileArrowScanner::get_next(vectorized::Block* block, bool* eof) {
-    SCOPED_TIMER(_read_timer);
-    // init arrow batch
-    {
-        Status st = _init_arrow_batch_if_necessary();
-        if (!st.ok()) {
-            if (!st.is_end_of_file()) {
-                return st;
-            }
-            *eof = true;
-            return Status::OK();
-        }
-    }
-
-    RETURN_IF_ERROR(init_block(block));
-    // convert arrow batch to block until reach the batch_size
-    while (!_scanner_eof) {
-        // cast arrow type to PT0 and append it to block
-        // for example: arrow::Type::INT16 => TYPE_SMALLINT
-        RETURN_IF_ERROR(_append_batch_to_block(block));
-        // finalize the block if full
-        if (_rows >= _state->batch_size()) {
-            break;
-        }
-        auto status = _next_arrow_batch();
-        // if ok, append the batch to the columns
-        if (status.ok()) {
-            continue;
-        }
-        // return error if not EOF
-        if (!status.is_end_of_file()) {
-            return status;
-        }
-        _cur_file_eof = true;
-        break;
-    }
-
-    return finalize_block(block, eof);
-}
-
-Status FileArrowScanner::_append_batch_to_block(Block* block) {
-    SCOPED_TIMER(_convert_arrow_block_timer);
-    size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
-                                           (_batch->num_rows() - _arrow_batch_cur_idx));
-    for (auto i = 0; i < _file_slot_descs.size(); ++i) {
-        SlotDescriptor* slot_desc = _file_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        std::string real_column_name = _cur_file_reader->is_case_sensitive()
-                                               ? slot_desc->col_name()
-                                               : slot_desc->col_name_lower_case();
-        auto* array = _batch->GetColumnByName(real_column_name).get();
-        auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
-        RETURN_IF_ERROR(arrow_column_to_doris_column(
-                array, _arrow_batch_cur_idx, column_with_type_and_name.column,
-                column_with_type_and_name.type, num_elements, _state->timezone_obj()));
-    }
-    _rows += num_elements;
-    _arrow_batch_cur_idx += num_elements;
-    return _fill_columns_from_path(block, num_elements);
-}
-
-void VFileParquetScanner::_update_profile(std::shared_ptr<Statistics>& statistics) {
-    COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups);
-    COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows);
-    COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes);
-    COUNTER_UPDATE(_total_rows_counter, statistics->total_rows);
-    COUNTER_UPDATE(_total_groups_counter, statistics->total_groups);
-    COUNTER_UPDATE(_total_bytes_counter, statistics->total_bytes);
-}
-
-void FileArrowScanner::close() {
-    FileScanner::close();
-    if (_cur_file_reader != nullptr) {
-        delete _cur_file_reader;
-        _cur_file_reader = nullptr;
-    }
-}
-
-VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile,
-                                         const TFileScanRangeParams& params,
-                                         const std::vector<TFileRangeDesc>& ranges,
-                                         const std::vector<TExpr>& pre_filter_texprs,
-                                         ScannerCounter* counter)
-        : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {
-    _init_profiles(profile);
-}
-
-ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(
-        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
-        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
-    return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
-                                 range_start_offset, range_size, false);
-}
-
-void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) {
-    _filtered_row_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsFiltered", TUnit::UNIT);
-    _filtered_rows_counter = ADD_COUNTER(_profile, "ParquetRowsFiltered", TUnit::UNIT);
-    _filtered_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesFiltered", TUnit::BYTES);
-    _total_rows_counter = ADD_COUNTER(_profile, "ParquetRowsTotal", TUnit::UNIT);
-    _total_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsTotal", TUnit::UNIT);
-    _total_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesTotal", TUnit::BYTES);
-}
-
-VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
-                                 const TFileScanRangeParams& params,
-                                 const std::vector<TFileRangeDesc>& ranges,
-                                 const std::vector<TExpr>& pre_filter_texprs,
-                                 ScannerCounter* counter)
-        : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}
-
-ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(
-        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
-        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
-    return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
-                             range_start_offset, range_size, false);
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h
deleted file mode 100644
index 113bd54d6e..0000000000
--- a/be/src/vec/exec/file_arrow_scanner.h
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <arrow/array.h>
-#include <exec/arrow/arrow_reader.h>
-#include <exec/arrow/orc_reader.h>
-
-#include <map>
-#include <memory>
-#include <sstream>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "common/status.h"
-#include "exec/base_scanner.h"
-#include "util/runtime_profile.h"
-#include "vec/exec/file_scanner.h"
-
-namespace doris::vectorized {
-
-// VArrow scanner convert the data read from orc|parquet to doris's columns.
-class FileArrowScanner : public FileScanner {
-public:
-    FileArrowScanner(RuntimeState* state, RuntimeProfile* profile,
-                     const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
-                     const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
-    ~FileArrowScanner() override;
-
-    // Open this scanner, will initialize information need to
-    Status open() override;
-
-    Status get_next(Block* block, bool* eof) override;
-
-    void close() override;
-
-protected:
-    virtual ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
-                                               FileReader* file_reader,
-                                               int32_t num_of_columns_from_file,
-                                               int64_t range_start_offset, int64_t range_size) = 0;
-    virtual void _update_profile(std::shared_ptr<Statistics>& statistics) {}
-
-private:
-    // Read next buffer from reader
-    Status _open_next_reader();
-    Status _next_arrow_batch();
-    Status _init_arrow_batch_if_necessary();
-    Status _append_batch_to_block(Block* block);
-
-private:
-    // Reader
-    ArrowReaderWrap* _cur_file_reader;
-    bool _cur_file_eof; // is read over?
-    std::shared_ptr<arrow::RecordBatch> _batch;
-    size_t _arrow_batch_cur_idx;
-    RuntimeProfile::Counter* _convert_arrow_block_timer;
-};
-
-class VFileParquetScanner final : public FileArrowScanner {
-public:
-    VFileParquetScanner(RuntimeState* state, RuntimeProfile* profile,
-                        const TFileScanRangeParams& params,
-                        const std::vector<TFileRangeDesc>& ranges,
-                        const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
-    ~VFileParquetScanner() override = default;
-
-protected:
-    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
-                                       FileReader* file_reader, int32_t num_of_columns_from_file,
-                                       int64_t range_start_offset, int64_t range_size) override;
-
-    void _init_profiles(RuntimeProfile* profile) override;
-    void _update_profile(std::shared_ptr<Statistics>& statistics) override;
-
-private:
-    RuntimeProfile::Counter* _filtered_row_groups_counter;
-    RuntimeProfile::Counter* _filtered_rows_counter;
-    RuntimeProfile::Counter* _filtered_bytes_counter;
-    RuntimeProfile::Counter* _total_rows_counter;
-    RuntimeProfile::Counter* _total_groups_counter;
-    RuntimeProfile::Counter* _total_bytes_counter;
-};
-
-class VFileORCScanner final : public FileArrowScanner {
-public:
-    VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
-                    const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
-                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
-    ~VFileORCScanner() override = default;
-
-protected:
-    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
-                                       FileReader* file_reader, int32_t num_of_columns_from_file,
-                                       int64_t range_start_offset, int64_t range_size) override;
-    void _init_profiles(RuntimeProfile* profile) override {};
-};
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
deleted file mode 100644
index 3a3f9634e9..0000000000
--- a/be/src/vec/exec/file_scan_node.cpp
+++ /dev/null
@@ -1,508 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/file_scan_node.h"
-
-#include "common/config.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/runtime_filter_mgr.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple.h"
-#include "runtime/tuple_row.h"
-#include "util/priority_thread_pool.hpp"
-#include "util/runtime_profile.h"
-#include "util/thread.h"
-#include "util/types.h"
-#include "vec/exec/file_arrow_scanner.h"
-#include "vec/exec/file_text_scanner.h"
-#include "vec/exprs/vcompound_pred.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-
-namespace doris::vectorized {
-
-FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-        : ScanNode(pool, tnode, descs),
-          _tuple_id(tnode.file_scan_node.tuple_id),
-          _runtime_state(nullptr),
-          _tuple_desc(nullptr),
-          _num_running_scanners(0),
-          _scan_finished(false),
-          _max_buffered_batches(32),
-          _wait_scanner_timer(nullptr),
-          _runtime_filter_descs(tnode.runtime_filters) {}
-
-Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
-    RETURN_IF_ERROR(ScanNode::init(tnode, state));
-
-    int filter_size = _runtime_filter_descs.size();
-    _runtime_filter_ctxs.resize(filter_size);
-    _runtime_filter_ready_flag.resize(filter_size);
-    for (int i = 0; i < filter_size; ++i) {
-        IRuntimeFilter* runtime_filter = nullptr;
-        const auto& filter_desc = _runtime_filter_descs[i];
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
-                RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id()));
-        RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
-                                                                        &runtime_filter));
-
-        _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
-        _runtime_filter_ready_flag[i] = false;
-        _rf_locks.push_back(std::make_unique<std::mutex>());
-    }
-
-    return Status::OK();
-}
-
-Status FileScanNode::prepare(RuntimeState* state) {
-    VLOG_QUERY << "FileScanNode prepare";
-    RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
-    // get tuple desc
-    _runtime_state = state;
-    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
-    if (_tuple_desc == nullptr) {
-        std::stringstream ss;
-        ss << "Failed to get tuple descriptor, _tuple_id=" << _tuple_id;
-        return Status::InternalError(ss.str());
-    }
-
-    // Initialize slots map
-    for (auto slot : _tuple_desc->slots()) {
-        auto pair = _slots_map.emplace(slot->col_name(), slot);
-        if (!pair.second) {
-            std::stringstream ss;
-            ss << "Failed to insert slot, col_name=" << slot->col_name();
-            return Status::InternalError(ss.str());
-        }
-    }
-
-    // Profile
-    _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime");
-    _filter_timer = ADD_TIMER(runtime_profile(), "PredicateFilteredTime");
-    _num_rows_filtered = ADD_COUNTER(runtime_profile(), "PredicateFilteredRows", TUnit::UNIT);
-    _num_scanners = ADD_COUNTER(runtime_profile(), "NumScanners", TUnit::UNIT);
-
-    return Status::OK();
-}
-
-Status FileScanNode::open(RuntimeState* state) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
-    RETURN_IF_CANCELLED(state);
-
-    RETURN_IF_ERROR(_acquire_and_build_runtime_filter(state));
-
-    RETURN_IF_ERROR(start_scanners());
-
-    return Status::OK();
-}
-
-Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) {
-    // acquire runtime filter
-    _runtime_filter_ctxs.resize(_runtime_filter_descs.size());
-    for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
-        auto& filter_desc = _runtime_filter_descs[i];
-        IRuntimeFilter* runtime_filter = nullptr;
-        state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
-        DCHECK(runtime_filter != nullptr);
-        if (runtime_filter == nullptr) {
-            continue;
-        }
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready) {
-            _runtime_filter_ctxs[i].apply_mark = true;
-            _runtime_filter_ctxs[i].runtimefilter = runtime_filter;
-
-            // TODO: currently, after calling get_push_expr_ctxs(), the func ptr in runtime_filter
-            // will be released, and it will not be used again for building vexpr.
-            //
-            // std::list<ExprContext*> expr_context;
-            // RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context));
-            // for (auto ctx : expr_context) {
-            //     ctx->prepare(state, row_desc());
-            //     ctx->open(state);
-            //     int index = _conjunct_ctxs.size();
-            //     _conjunct_ctxs.push_back(ctx);
-            //     // it's safe to store address from a fix-resized vector
-            //     _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i];
-            // }
-        }
-    }
-
-    // rebuild vexpr
-    for (int i = 0; i < _runtime_filter_ctxs.size(); ++i) {
-        if (!_runtime_filter_ctxs[i].apply_mark) {
-            continue;
-        }
-        IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter;
-        std::vector<VExpr*> vexprs;
-        runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor);
-        if (vexprs.empty()) {
-            continue;
-        }
-        auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0];
-        for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) {
-            TExprNode texpr_node;
-            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
-            texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
-            texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND);
-            VExpr* new_node = _pool->add(new VcompoundPred(texpr_node));
-            new_node->add_child(last_expr);
-            new_node->add_child(vexprs[j]);
-            last_expr = new_node;
-        }
-        auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr));
-        auto expr_status = new_vconjunct_ctx_ptr->prepare(state, _row_descriptor);
-        if (UNLIKELY(!expr_status.OK())) {
-            LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
-            vexprs.clear();
-            break;
-        }
-
-        expr_status = new_vconjunct_ctx_ptr->open(state);
-        if (UNLIKELY(!expr_status.OK())) {
-            LOG(WARNING) << "Something wrong for runtime filters: " << expr_status;
-            vexprs.clear();
-            break;
-        }
-        if (_vconjunct_ctx_ptr) {
-            _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr));
-        }
-        _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*);
-        *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr;
-        _runtime_filter_ready_flag[i] = true;
-    }
-    return Status::OK();
-}
-
-Status FileScanNode::start_scanners() {
-    {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        _num_running_scanners = _scan_ranges.size();
-    }
-
-    _scanners_status.resize(_scan_ranges.size());
-    COUNTER_UPDATE(_num_scanners, _scan_ranges.size());
-    ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token();
-    PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool();
-    for (int i = 0; i < _scan_ranges.size(); ++i) {
-        Status submit_status = Status::OK();
-        if (thread_token != nullptr) {
-            submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this,
-                                                                i, _scan_ranges.size(),
-                                                                std::ref(_scanners_status[i])));
-        } else {
-            PriorityThreadPool::WorkFunction task =
-                    std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(),
-                              std::ref(_scanners_status[i]));
-            if (!thread_pool->offer(task)) {
-                submit_status = Status::Cancelled("Failed to submit scan task");
-            }
-        }
-        if (!submit_status.ok()) {
-            LOG(WARNING) << "Failed to assign file scanner task to thread pool! "
-                         << submit_status.get_error_msg();
-            _scanners_status[i].set_value(submit_status);
-            for (int j = i + 1; j < _scan_ranges.size(); ++j) {
-                _scanners_status[j].set_value(Status::Cancelled("Cancelled"));
-            }
-            {
-                std::lock_guard<std::mutex> l(_batch_queue_lock);
-                update_status(submit_status);
-                _num_running_scanners -= _scan_ranges.size() - i;
-            }
-            _queue_writer_cond.notify_all();
-            break;
-        }
-    }
-    return Status::OK();
-}
-
-Status FileScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    // check if CANCELLED.
-    if (state->is_cancelled()) {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        if (update_status(Status::Cancelled("Cancelled"))) {
-            // Notify all scanners
-            _queue_writer_cond.notify_all();
-        }
-    }
-
-    if (_scan_finished.load()) {
-        *eos = true;
-        return Status::OK();
-    }
-
-    const int batch_size = _runtime_state->batch_size();
-    while (true) {
-        std::shared_ptr<vectorized::Block> scanner_block;
-        {
-            std::unique_lock<std::mutex> l(_batch_queue_lock);
-            while (_process_status.ok() && !_runtime_state->is_cancelled() &&
-                   _num_running_scanners > 0 && _block_queue.empty()) {
-                SCOPED_TIMER(_wait_scanner_timer);
-                _queue_reader_cond.wait_for(l, std::chrono::seconds(1));
-            }
-            if (!_process_status.ok()) {
-                // Some scanner process failed.
-                return _process_status;
-            }
-            if (_runtime_state->is_cancelled()) {
-                if (update_status(Status::Cancelled("Cancelled"))) {
-                    _queue_writer_cond.notify_all();
-                }
-                return _process_status;
-            }
-            if (!_block_queue.empty()) {
-                scanner_block = _block_queue.front();
-                _block_queue.pop_front();
-            }
-        }
-
-        // All scanner has been finished, and all cached batch has been read
-        if (!scanner_block) {
-            if (_mutable_block && !_mutable_block->empty()) {
-                *block = _mutable_block->to_block();
-                reached_limit(block, eos);
-                LOG_IF(INFO, *eos) << "FileScanNode ReachedLimit.";
-            }
-            _scan_finished.store(true);
-            *eos = true;
-            return Status::OK();
-        }
-        // notify one scanner
-        _queue_writer_cond.notify_one();
-
-        if (UNLIKELY(!_mutable_block)) {
-            _mutable_block.reset(new MutableBlock(scanner_block->clone_empty()));
-        }
-
-        if (_mutable_block->rows() + scanner_block->rows() < batch_size) {
-            // merge scanner_block into _mutable_block
-            _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
-            continue;
-        } else {
-            if (_mutable_block->empty()) {
-                // directly use scanner_block
-                *block = std::move(*scanner_block);
-            } else {
-                // copy _mutable_block firstly, then merge scanner_block into _mutable_block for next.
-                *block = _mutable_block->to_block();
-                _mutable_block->set_muatable_columns(scanner_block->clone_empty_columns());
-                _mutable_block->add_rows(scanner_block.get(), 0, scanner_block->rows());
-            }
-            break;
-        }
-    }
-
-    reached_limit(block, eos);
-    if (*eos) {
-        _scan_finished.store(true);
-        _queue_writer_cond.notify_all();
-        LOG(INFO) << "FileScanNode ReachedLimit.";
-    } else {
-        *eos = false;
-    }
-
-    return Status::OK();
-}
-
-Status FileScanNode::close(RuntimeState* state) {
-    if (is_closed()) {
-        return Status::OK();
-    }
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    _scan_finished.store(true);
-    _queue_writer_cond.notify_all();
-    _queue_reader_cond.notify_all();
-    {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; });
-    }
-    for (int i = 0; i < _scanners_status.size(); i++) {
-        std::future<Status> f = _scanners_status[i].get_future();
-        RETURN_IF_ERROR(f.get());
-    }
-    // Close
-    _batch_queue.clear();
-
-    for (auto& filter_desc : _runtime_filter_descs) {
-        IRuntimeFilter* runtime_filter = nullptr;
-        state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter);
-        DCHECK(runtime_filter != nullptr);
-        runtime_filter->consumer_close();
-    }
-
-    for (auto& ctx : _stale_vexpr_ctxs) {
-        (*ctx)->close(state);
-    }
-
-    return ExecNode::close(state);
-}
-
-Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter) {
-    //create scanner object and open
-    std::unique_ptr<FileScanner> scanner = create_scanner(scan_range, counter);
-    RETURN_IF_ERROR(scanner->open());
-    bool scanner_eof = false;
-    while (!scanner_eof) {
-        RETURN_IF_CANCELLED(_runtime_state);
-        // If we have finished all works
-        if (_scan_finished.load() || !_process_status.ok()) {
-            return Status::OK();
-        }
-
-        std::shared_ptr<vectorized::Block> block(new vectorized::Block());
-        RETURN_IF_ERROR(scanner->get_next(block.get(), &scanner_eof));
-        if (block->rows() == 0) {
-            continue;
-        }
-        auto old_rows = block->rows();
-        {
-            SCOPED_TIMER(_filter_timer);
-            RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(),
-                                                       _tuple_desc->slots().size()));
-        }
-        counter->num_rows_unselected += old_rows - block->rows();
-        if (block->rows() == 0) {
-            continue;
-        }
-
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        while (_process_status.ok() && !_scan_finished.load() && !_runtime_state->is_cancelled() &&
-               // stop pushing more batch if
-               // 1. too many batches in queue, or
-               // 2. at least one batch in queue and memory exceed limit.
-               (_block_queue.size() >= _max_buffered_batches || !_block_queue.empty())) {
-            _queue_writer_cond.wait_for(l, std::chrono::seconds(1));
-        }
-        // Process already set failed, so we just return OK
-        if (!_process_status.ok()) {
-            return Status::OK();
-        }
-        // Scan already finished, just return
-        if (_scan_finished.load()) {
-            return Status::OK();
-        }
-        // Runtime state is canceled, just return cancel
-        if (_runtime_state->is_cancelled()) {
-            return Status::Cancelled("Cancelled");
-        }
-        // Queue size Must be smaller than _max_buffered_batches
-        _block_queue.push_back(block);
-
-        // Notify reader to process
-        _queue_reader_cond.notify_one();
-    }
-    return Status::OK();
-}
-
-void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
-    Thread::set_self_name("file_scanner");
-    Status status = Status::OK();
-    ScannerCounter counter;
-    const TFileScanRange& scan_range =
-            _scan_ranges[start_idx].scan_range.ext_scan_range.file_scan_range;
-    status = scanner_scan(scan_range, &counter);
-    if (!status.ok()) {
-        LOG(WARNING) << "Scanner[" << start_idx
-                     << "] process failed. status=" << status.get_error_msg();
-    }
-
-    // Update stats
-    _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered);
-    _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected);
-    COUNTER_UPDATE(_num_rows_filtered, counter.num_rows_unselected);
-
-    // scanner is going to finish
-    {
-        std::lock_guard<std::mutex> l(_batch_queue_lock);
-        if (!status.ok()) {
-            update_status(status);
-        }
-        // This scanner will finish
-        _num_running_scanners--;
-    }
-    _queue_reader_cond.notify_all();
-    // If one scanner failed, others don't need scan any more
-    if (!status.ok()) {
-        _queue_writer_cond.notify_all();
-    }
-    p_status.set_value(status);
-}
-
-std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range,
-                                                          ScannerCounter* counter) {
-    FileScanner* scan = nullptr;
-    switch (scan_range.params.format_type) {
-    case TFileFormatType::FORMAT_PARQUET:
-        scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
-                                       scan_range.ranges, _pre_filter_texprs, counter);
-        break;
-    case TFileFormatType::FORMAT_ORC:
-        scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params,
-                                   scan_range.ranges, _pre_filter_texprs, counter);
-        break;
-
-    default:
-        scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params,
-                                   scan_range.ranges, _pre_filter_texprs, counter);
-    }
-    scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
-    std::unique_ptr<FileScanner> scanner(scan);
-    return scanner;
-}
-
-// This function is called after plan node has been prepared.
-Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
-    int max_scanners = config::doris_scanner_thread_pool_thread_num;
-    if (scan_ranges.size() <= max_scanners) {
-        _scan_ranges = scan_ranges;
-    } else {
-        // There is no need for the number of scanners to exceed the number of threads in thread pool.
-        _scan_ranges.clear();
-        auto range_iter = scan_ranges.begin();
-        for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) {
-            _scan_ranges.push_back(*range_iter);
-        }
-        for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) {
-            if (i == max_scanners) {
-                i = 0;
-            }
-            auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges;
-            auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges;
-            ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end());
-        }
-        _scan_ranges.shrink_to_fit();
-        LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size();
-    }
-    return Status::OK();
-}
-
-void FileScanNode::debug_string(int ident_level, std::stringstream* out) const {
-    (*out) << "FileScanNode";
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h
deleted file mode 100644
index 850c74db0c..0000000000
--- a/be/src/vec/exec/file_scan_node.h
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <atomic>
-#include <condition_variable>
-#include <future>
-#include <map>
-#include <mutex>
-#include <string>
-#include <thread>
-#include <vector>
-
-#include "common/status.h"
-#include "exec/base_scanner.h"
-#include "exec/scan_node.h"
-#include "exprs/runtime_filter.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "runtime/descriptors.h"
-#include "vec/exec/file_scanner.h"
-namespace doris {
-
-class RuntimeState;
-class Status;
-
-namespace vectorized {
-class FileScanNode final : public ScanNode {
-public:
-    FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-    ~FileScanNode() override = default;
-
-    // Called after create this scan node
-    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
-
-    // Prepare partition infos & set up timer
-    Status prepare(RuntimeState* state) override;
-
-    // Start file scan using ParquetScanner or OrcScanner.
-    Status open(RuntimeState* state) override;
-
-    // Fill the next row batch by calling next() on the scanner,
-    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override {
-        return Status::NotSupported("Not Implemented FileScanNode::get_next.");
-    }
-
-    Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override;
-
-    // Close the scanner, and report errors.
-    Status close(RuntimeState* state) override;
-
-    // No use
-    Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
-
-private:
-    // Write debug string of this into out.
-    void debug_string(int indentation_level, std::stringstream* out) const override;
-
-    // Update process status to one failed status,
-    // NOTE: Must hold the mutex of this scan node
-    bool update_status(const Status& new_status) {
-        if (_process_status.ok()) {
-            _process_status = new_status;
-            return true;
-        }
-        return false;
-    }
-
-    std::unique_ptr<FileScanner> create_scanner(const TFileScanRange& scan_range,
-                                                ScannerCounter* counter);
-
-    Status start_scanners();
-
-    void scanner_worker(int start_idx, int length, std::promise<Status>& p_status);
-    // Scan one range
-    Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter);
-
-    Status _acquire_and_build_runtime_filter(RuntimeState* state);
-
-    TupleId _tuple_id;
-    RuntimeState* _runtime_state;
-    TupleDescriptor* _tuple_desc;
-    std::map<std::string, SlotDescriptor*> _slots_map;
-    std::vector<TScanRangeParams> _scan_ranges;
-
-    std::mutex _batch_queue_lock;
-    std::condition_variable _queue_reader_cond;
-    std::condition_variable _queue_writer_cond;
-    std::deque<std::shared_ptr<RowBatch>> _batch_queue;
-
-    int _num_running_scanners;
-
-    std::atomic<bool> _scan_finished;
-
-    Status _process_status;
-
-    std::vector<std::promise<Status>> _scanners_status;
-
-    int _max_buffered_batches;
-
-    // The origin preceding filter exprs.
-    // These exprs will be converted to expr context
-    // in XXXScanner.
-    // Because the row descriptor used for these exprs is `src_row_desc`,
-    // which is initialized in XXXScanner.
-    std::vector<TExpr> _pre_filter_texprs;
-
-    RuntimeProfile::Counter* _wait_scanner_timer;
-    RuntimeProfile::Counter* _num_rows_filtered;
-    RuntimeProfile::Counter* _filter_timer;
-    RuntimeProfile::Counter* _num_scanners;
-
-    std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
-    std::unique_ptr<MutableBlock> _mutable_block;
-
-protected:
-    struct RuntimeFilterContext {
-        RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {}
-        bool apply_mark;
-        IRuntimeFilter* runtimefilter;
-    };
-
-    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const {
-        return _runtime_filter_descs;
-    }
-    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
-    std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
-    std::vector<bool> _runtime_filter_ready_flag;
-    std::vector<std::unique_ptr<std::mutex>> _rf_locks;
-    std::map<int, RuntimeFilterContext*> _conjunctid_to_runtime_filter_ctxs;
-    std::vector<std::unique_ptr<VExprContext*>> _stale_vexpr_ctxs;
-};
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp
deleted file mode 100644
index 04eb03db83..0000000000
--- a/be/src/vec/exec/file_scanner.cpp
+++ /dev/null
@@ -1,199 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "file_scanner.h"
-
-#include <fmt/format.h>
-
-#include <vec/data_types/data_type_factory.hpp>
-
-#include "common/logging.h"
-#include "common/utils.h"
-#include "exec/exec_node.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple.h"
-
-namespace doris::vectorized {
-
-FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile,
-                         const TFileScanRangeParams& params,
-                         const std::vector<TFileRangeDesc>& ranges,
-                         const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
-        : _state(state),
-          _params(params),
-          _ranges(ranges),
-          _next_range(0),
-          _counter(counter),
-          _mem_pool(std::make_unique<MemPool>()),
-          _pre_filter_texprs(pre_filter_texprs),
-          _profile(profile),
-          _rows_read_counter(nullptr),
-          _read_timer(nullptr),
-          _scanner_eof(false) {
-    _text_converter.reset(new (std::nothrow) TextConverter('\\'));
-}
-
-Status FileScanner::open() {
-    _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
-    _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
-    return _init_expr_ctxes();
-}
-
-void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId,
-                                    const std::vector<ExprContext*>& conjunct_ctxs) {
-    _conjunct_ctxs = conjunct_ctxs;
-    _tupleId = tupleId;
-}
-
-Status FileScanner::_init_expr_ctxes() {
-    const TupleDescriptor* src_tuple_desc =
-            _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id);
-    if (src_tuple_desc == nullptr) {
-        std::stringstream ss;
-        ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id;
-        return Status::InternalError(ss.str());
-    }
-    DCHECK(!_ranges.empty());
-
-    std::map<SlotId, int> _full_src_index_map;
-    std::map<SlotId, SlotDescriptor*> _full_src_slot_map;
-    int index = 0;
-    for (const auto& slot_desc : src_tuple_desc->slots()) {
-        _full_src_slot_map.emplace(slot_desc->id(), slot_desc);
-        _full_src_index_map.emplace(slot_desc->id(), index++);
-    }
-
-    _num_of_columns_from_file = _params.num_of_columns_from_file;
-    for (const auto& slot_info : _params.required_slots) {
-        auto slot_id = slot_info.slot_id;
-        auto it = _full_src_slot_map.find(slot_id);
-        if (it == std::end(_full_src_slot_map)) {
-            std::stringstream ss;
-            ss << "Unknown source slot descriptor, slot_id=" << slot_id;
-            return Status::InternalError(ss.str());
-        }
-        _required_slot_descs.emplace_back(it->second);
-        if (slot_info.is_file_slot) {
-            _file_slot_descs.emplace_back(it->second);
-            auto iti = _full_src_index_map.find(slot_id);
-            _file_slot_index_map.emplace(slot_id, iti->second);
-        } else {
-            _partition_slot_descs.emplace_back(it->second);
-            auto iti = _full_src_index_map.find(slot_id);
-            _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
-        }
-    }
-
-    _row_desc.reset(new RowDescriptor(_state->desc_tbl(),
-                                      std::vector<TupleId>({_params.src_tuple_id}),
-                                      std::vector<bool>({false})));
-
-    // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
-    if (!_pre_filter_texprs.empty()) {
-        // for vectorized, preceding filter exprs should be compounded to one passed from fe.
-        DCHECK(_pre_filter_texprs.size() == 1);
-        _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
-                _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
-        RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
-        RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
-    }
-
-    return Status::OK();
-}
-
-void FileScanner::close() {
-    if (_vpre_filter_ctx_ptr) {
-        (*_vpre_filter_ctx_ptr)->close(_state);
-    }
-    if (_rows_read_counter) {
-        COUNTER_UPDATE(_rows_read_counter, _read_row_counter);
-    }
-}
-
-Status FileScanner::init_block(vectorized::Block* block) {
-    (*block).clear();
-    _rows = 0;
-    for (const auto& slot_desc : _required_slot_descs) {
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        auto is_nullable = slot_desc->is_nullable();
-        auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
-                                                                                  is_nullable);
-        if (data_type == nullptr) {
-            return Status::NotSupported(
-                    fmt::format("Not support type for column:{}", slot_desc->col_name()));
-        }
-        MutableColumnPtr data_column = data_type->create_column();
-        (*block).insert(
-                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
-    }
-    return Status::OK();
-}
-
-Status FileScanner::_filter_block(vectorized::Block* _block) {
-    auto origin_column_num = (*_block).columns();
-    // filter block
-    auto old_rows = (*_block).rows();
-    RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, _block,
-                                                           origin_column_num));
-    _counter->num_rows_unselected += old_rows - (*_block).rows();
-    return Status::OK();
-}
-
-Status FileScanner::finalize_block(vectorized::Block* _block, bool* eof) {
-    *eof = _scanner_eof;
-    _read_row_counter += _block->rows();
-    if (LIKELY(_rows > 0)) {
-        RETURN_IF_ERROR(_filter_block(_block));
-    }
-
-    return Status::OK();
-}
-
-Status FileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
-    const TFileRangeDesc& range = _ranges.at(_next_range - 1);
-    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
-        for (const auto& slot_desc : _partition_slot_descs) {
-            if (slot_desc == nullptr) continue;
-            auto it = _partition_slot_index_map.find(slot_desc->id());
-            if (it == std::end(_partition_slot_index_map)) {
-                std::stringstream ss;
-                ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id();
-                return Status::InternalError(ss.str());
-            }
-            const std::string& column_from_path = range.columns_from_path[it->second];
-
-            auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
-            IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-
-            for (size_t j = 0; j < rows; ++j) {
-                _text_converter->write_vec_column(slot_desc, col_ptr,
-                                                  const_cast<char*>(column_from_path.c_str()),
-                                                  column_from_path.size(), true, false);
-            }
-        }
-    }
-    return Status::OK();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h
deleted file mode 100644
index df4c1d4ef6..0000000000
--- a/be/src/vec/exec/file_scanner.h
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/status.h"
-#include "exec/base_scanner.h"
-#include "exec/text_converter.h"
-#include "exprs/expr.h"
-#include "util/runtime_profile.h"
-#include "vec/exprs/vexpr.h"
-#include "vec/exprs/vexpr_context.h"
-
-namespace doris::vectorized {
-class FileScanNode;
-class FileScanner {
-public:
-    FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params,
-                const std::vector<TFileRangeDesc>& ranges,
-                const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
-    virtual ~FileScanner() = default;
-
-    virtual void reg_conjunct_ctxs(const TupleId& tupleId,
-                                   const std::vector<ExprContext*>& conjunct_ctxs);
-
-    // Open this scanner, will initialize information need to
-    virtual Status open();
-
-    // Get next block
-    virtual Status get_next(vectorized::Block* block, bool* eof) {
-        return Status::NotSupported("Not Implemented get block");
-    }
-
-    // Close this scanner
-    virtual void close() = 0;
-
-    std::vector<bool>* mutable_runtime_filter_marks() { return &_runtime_filter_marks; }
-
-protected:
-    virtual void _init_profiles(RuntimeProfile* profile) = 0;
-
-    Status finalize_block(vectorized::Block* dest_block, bool* eof);
-    Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
-    Status init_block(vectorized::Block* block);
-
-    std::unique_ptr<TextConverter> _text_converter;
-
-    RuntimeState* _state;
-    const TFileScanRangeParams& _params;
-
-    const std::vector<TFileRangeDesc>& _ranges;
-    int _next_range;
-    // used for process stat
-    ScannerCounter* _counter;
-
-    // Used for constructing tuple
-    std::vector<SlotDescriptor*> _required_slot_descs;
-    std::vector<SlotDescriptor*> _file_slot_descs;
-    std::map<SlotId, int> _file_slot_index_map;
-    std::vector<SlotDescriptor*> _partition_slot_descs;
-    std::map<SlotId, int> _partition_slot_index_map;
-
-    std::unique_ptr<RowDescriptor> _row_desc;
-
-    // Mem pool used to allocate _src_tuple and _src_tuple_row
-    std::unique_ptr<MemPool> _mem_pool;
-
-    const std::vector<TExpr> _pre_filter_texprs;
-
-    // Profile
-    RuntimeProfile* _profile;
-    RuntimeProfile::Counter* _rows_read_counter;
-    RuntimeProfile::Counter* _read_timer;
-
-    bool _scanner_eof = false;
-    int _rows = 0;
-    long _read_row_counter = 0;
-
-    std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
-    int _num_of_columns_from_file;
-
-    // File formats based push down predicate
-    std::vector<ExprContext*> _conjunct_ctxs;
-    // slot_ids for parquet predicate push down are in tuple desc
-    TupleId _tupleId;
-
-    // to record which runtime filters have been used
-    std::vector<bool> _runtime_filter_marks;
-
-    FileScanNode* _parent;
-
-private:
-    Status _init_expr_ctxes();
-    Status _filter_block(vectorized::Block* output_block);
-};
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp
deleted file mode 100644
index aee756868e..0000000000
--- a/be/src/vec/exec/file_text_scanner.cpp
+++ /dev/null
@@ -1,294 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/file_text_scanner.h"
-
-#include <fmt/format.h>
-#include <gen_cpp/internal_service.pb.h>
-
-#include <iostream>
-
-#include "exec/exec_node.h"
-#include "exec/plain_text_line_reader.h"
-#include "exec/text_converter.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "io/buffered_reader.h"
-#include "io/file_factory.h"
-#include "io/hdfs_reader_writer.h"
-#include "util/types.h"
-#include "util/utf8_check.h"
-
-namespace doris::vectorized {
-
-FileTextScanner::FileTextScanner(RuntimeState* state, RuntimeProfile* profile,
-                                 const TFileScanRangeParams& params,
-                                 const std::vector<TFileRangeDesc>& ranges,
-                                 const std::vector<TExpr>& pre_filter_texprs,
-                                 ScannerCounter* counter)
-        : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter),
-          _cur_file_reader(nullptr),
-          _cur_line_reader(nullptr),
-          _cur_line_reader_eof(false),
-          _skip_lines(0),
-          _success(false)
-
-{
-    _init_profiles(profile);
-    if (params.file_attributes.__isset.text_params) {
-        auto text_params = params.file_attributes.text_params;
-        if (text_params.__isset.column_separator) {
-            _value_separator = text_params.column_separator;
-            _value_separator_length = _value_separator.length();
-        }
-        if (text_params.__isset.line_delimiter) {
-            _line_delimiter = text_params.line_delimiter;
-            _line_delimiter_length = _line_delimiter.length();
-        }
-    }
-}
-
-FileTextScanner::~FileTextScanner() {
-    close();
-}
-
-Status FileTextScanner::open() {
-    RETURN_IF_ERROR(FileScanner::open());
-
-    if (_ranges.empty()) {
-        return Status::OK();
-    }
-    _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
-    return Status::OK();
-}
-
-void FileTextScanner::close() {
-    FileScanner::close();
-
-    if (_cur_line_reader != nullptr) {
-        delete _cur_line_reader;
-        _cur_line_reader = nullptr;
-    }
-}
-
-Status FileTextScanner::get_next(Block* block, bool* eof) {
-    SCOPED_TIMER(_read_timer);
-    RETURN_IF_ERROR(init_block(block));
-
-    const int batch_size = _state->batch_size();
-
-    int current_rows = _rows;
-    while (_rows < batch_size && !_scanner_eof) {
-        if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
-            RETURN_IF_ERROR(_open_next_reader());
-            // If there isn't any more reader, break this
-            if (_scanner_eof) {
-                continue;
-            }
-        }
-        const uint8_t* ptr = nullptr;
-        size_t size = 0;
-        RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
-        if (_skip_lines > 0) {
-            _skip_lines--;
-            continue;
-        }
-        if (size == 0) {
-            // Read empty row, just continue
-            continue;
-        }
-        {
-            COUNTER_UPDATE(_rows_read_counter, 1);
-            RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
-        }
-        if (_cur_line_reader_eof) {
-            RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows));
-            current_rows = _rows;
-        }
-    }
-
-    return finalize_block(block, eof);
-}
-
-Status FileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
-    RETURN_IF_ERROR(_line_split_to_values(line));
-    if (!_success) {
-        // If not success, which means we met an invalid row, return.
-        return Status::OK();
-    }
-
-    for (int i = 0; i < _split_values.size(); ++i) {
-        auto slot_desc = _file_slot_descs[i];
-        const Slice& value = _split_values[i];
-
-        auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
-        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-        _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
-    }
-    _rows++;
-    return Status::OK();
-}
-
-Status FileTextScanner::_open_next_reader() {
-    if (_next_range >= _ranges.size()) {
-        _scanner_eof = true;
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(_open_file_reader());
-    RETURN_IF_ERROR(_open_line_reader());
-    _next_range++;
-
-    return Status::OK();
-}
-
-Status FileTextScanner::_open_file_reader() {
-    const TFileRangeDesc& range = _ranges[_next_range];
-    RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
-                                                    range.start_offset, range.file_size, 0,
-                                                    _cur_file_reader));
-    return _cur_file_reader->open();
-}
-
-Status FileTextScanner::_open_line_reader() {
-    if (_cur_line_reader != nullptr) {
-        delete _cur_line_reader;
-        _cur_line_reader = nullptr;
-    }
-
-    const TFileRangeDesc& range = _ranges[_next_range];
-    int64_t size = range.size;
-    if (range.start_offset != 0) {
-        if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
-            std::stringstream ss;
-            ss << "For now we do not support split compressed file";
-            return Status::InternalError(ss.str());
-        }
-        size += 1;
-        // not first range will always skip one line
-        _skip_lines = 1;
-    }
-
-    // open line reader
-    switch (_params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-        _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
-                                                   _line_delimiter, _line_delimiter_length);
-        break;
-    default: {
-        std::stringstream ss;
-        ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
-        return Status::InternalError(ss.str());
-    }
-    }
-
-    _cur_line_reader_eof = false;
-
-    return Status::OK();
-}
-
-Status FileTextScanner::_line_split_to_values(const Slice& line) {
-    if (!validate_utf8(line.data, line.size)) {
-        RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                []() -> std::string { return "Unable to display"; },
-                []() -> std::string {
-                    fmt::memory_buffer error_msg;
-                    fmt::format_to(error_msg, "{}", "Unable to display");
-                    return fmt::to_string(error_msg);
-                },
-                &_scanner_eof));
-        _counter->num_rows_filtered++;
-        _success = false;
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(_split_line(line));
-
-    _success = true;
-    return Status::OK();
-}
-
-Status FileTextScanner::_split_line(const Slice& line) {
-    _split_values.clear();
-    std::vector<Slice> tmp_split_values;
-    tmp_split_values.reserve(_num_of_columns_from_file);
-
-    const char* value = line.data;
-    size_t start = 0;     // point to the start pos of next col value.
-    size_t curpos = 0;    // point to the start pos of separator matching sequence.
-    size_t p1 = 0;        // point to the current pos of separator matching sequence.
-    size_t non_space = 0; // point to the last pos of non_space charactor.
-
-    // Separator: AAAA
-    //
-    //    p1
-    //     ▼
-    //     AAAA
-    //   1000AAAA2000AAAA
-    //   ▲   ▲
-    // Start │
-    //     curpos
-
-    while (curpos < line.size) {
-        if (*(value + curpos + p1) != _value_separator[p1]) {
-            // Not match, move forward:
-            curpos += (p1 == 0 ? 1 : p1);
-            p1 = 0;
-        } else {
-            p1++;
-            if (p1 == _value_separator_length) {
-                // Match a separator
-                non_space = curpos;
-                // Trim tailing spaces. Be consistent with hive and trino's behavior.
-                if (_state->trim_tailing_spaces_for_external_table_query()) {
-                    while (non_space > start && *(value + non_space - 1) == ' ') {
-                        non_space--;
-                    }
-                }
-                tmp_split_values.emplace_back(value + start, non_space - start);
-                start = curpos + _value_separator_length;
-                curpos = start;
-                p1 = 0;
-                non_space = 0;
-            }
-        }
-    }
-
-    CHECK(curpos == line.size) << curpos << " vs " << line.size;
-    non_space = curpos;
-    if (_state->trim_tailing_spaces_for_external_table_query()) {
-        while (non_space > start && *(value + non_space - 1) == ' ') {
-            non_space--;
-        }
-    }
-
-    tmp_split_values.emplace_back(value + start, non_space - start);
-    for (const auto& slot : _file_slot_descs) {
-        auto it = _file_slot_index_map.find(slot->id());
-        if (it == std::end(_file_slot_index_map)) {
-            std::stringstream ss;
-            ss << "Unknown _file_slot_index_map, slot_id=" << slot->id();
-            return Status::InternalError(ss.str());
-        }
-        int index = it->second;
-        CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file;
-        _split_values.emplace_back(tmp_split_values[index]);
-    }
-    return Status::OK();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/file_text_scanner.h b/be/src/vec/exec/file_text_scanner.h
deleted file mode 100644
index c632a842cb..0000000000
--- a/be/src/vec/exec/file_text_scanner.h
+++ /dev/null
@@ -1,72 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "common/consts.h"
-#include "exec/base_scanner.h"
-#include "exec/decompressor.h"
-#include "exec/line_reader.h"
-#include "exec/plain_binary_line_reader.h"
-#include "exec/plain_text_line_reader.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "io/file_factory.h"
-#include "io/file_reader.h"
-#include "vec/exec/file_scanner.h"
-
-namespace doris::vectorized {
-class FileTextScanner final : public FileScanner {
-public:
-    FileTextScanner(RuntimeState* state, RuntimeProfile* profile,
-                    const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges,
-                    const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
-
-    ~FileTextScanner() override;
-
-    Status open() override;
-
-    Status get_next(Block* block, bool* eof) override;
-    void close() override;
-
-protected:
-    void _init_profiles(RuntimeProfile* profile) override {}
-
-private:
-    Status _fill_file_columns(const Slice& line, vectorized::Block* _block);
-    Status _open_next_reader();
-    Status _open_file_reader();
-    Status _open_line_reader();
-    Status _line_split_to_values(const Slice& line);
-    Status _split_line(const Slice& line);
-    // Reader
-    std::unique_ptr<FileReader> _cur_file_reader;
-    LineReader* _cur_line_reader;
-    bool _cur_line_reader_eof;
-
-    // When we fetch range start from 0, header_type="csv_with_names" skip first line
-    // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
-    // When we fetch range doesn't start
-    int _skip_lines;
-    std::vector<Slice> _split_values;
-    std::string _value_separator;
-    std::string _line_delimiter;
-    int _value_separator_length;
-    int _line_delimiter_length;
-
-    bool _success;
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
deleted file mode 100644
index 5fa6c88764..0000000000
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp
+++ /dev/null
@@ -1,265 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/scan/new_file_arrow_scanner.h"
-
-#include "exec/arrow/orc_reader.h"
-#include "exec/arrow/parquet_reader.h"
-#include "io/file_factory.h"
-#include "vec/exec/scan/vscan_node.h"
-#include "vec/functions/simple_function_factory.h"
-#include "vec/utils/arrow_column_to_doris_column.h"
-
-namespace doris::vectorized {
-
-NewFileArrowScanner::NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent,
-                                         int64_t limit, const TFileScanRange& scan_range,
-                                         RuntimeProfile* profile,
-                                         const std::vector<TExpr>& pre_filter_texprs)
-        : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs),
-          _cur_file_reader(nullptr),
-          _cur_file_eof(false),
-          _batch(nullptr),
-          _arrow_batch_cur_idx(0) {}
-
-Status NewFileArrowScanner::open(RuntimeState* state) {
-    RETURN_IF_ERROR(NewFileScanner::open(state));
-    // SCOPED_TIMER(_parent->_reader_init_timer);
-
-    // _runtime_filter_marks.resize(_parent->runtime_filter_descs().size(), false);
-    return Status::OK();
-}
-
-Status NewFileArrowScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
-    // init arrow batch
-    {
-        Status st = _init_arrow_batch_if_necessary();
-        if (!st.ok()) {
-            if (!st.is_end_of_file()) {
-                return st;
-            }
-            *eof = true;
-            return Status::OK();
-        }
-    }
-
-    *eof = false;
-    if (!_is_load) {
-        RETURN_IF_ERROR(init_block(block));
-    }
-    // convert arrow batch to block until reach the batch_size
-    while (!_scanner_eof) {
-        // cast arrow type to PT0 and append it to block
-        // for example: arrow::Type::INT16 => TYPE_SMALLINT
-        RETURN_IF_ERROR(_append_batch_to_block(block));
-        // finalize the block if full
-        if (_rows >= _state->batch_size()) {
-            break;
-        }
-        auto status = _next_arrow_batch();
-        // if ok, append the batch to the columns
-        if (status.ok()) {
-            continue;
-        }
-        // return error if not EOF
-        if (!status.is_end_of_file()) {
-            return status;
-        }
-        _cur_file_eof = true;
-        break;
-    }
-
-    if (_scanner_eof && block->rows() == 0) {
-        *eof = true;
-    }
-    //        return finalize_block(block, eof);
-    return Status::OK();
-}
-
-Status NewFileArrowScanner::_init_arrow_batch_if_necessary() {
-    // 1. init batch if first time
-    // 2. reset reader if end of file
-    Status status = Status::OK();
-    if (_scanner_eof) {
-        return Status::EndOfFile("EOF");
-    }
-    if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) {
-        return _next_arrow_batch();
-    }
-    return status;
-}
-
-Status NewFileArrowScanner::_append_batch_to_block(Block* block) {
-    size_t num_elements = std::min<size_t>((_state->batch_size() - block->rows()),
-                                           (_batch->num_rows() - _arrow_batch_cur_idx));
-    for (auto i = 0; i < _file_slot_descs.size(); ++i) {
-        SlotDescriptor* slot_desc = _file_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        std::string real_column_name = _cur_file_reader->is_case_sensitive()
-                                               ? slot_desc->col_name()
-                                               : slot_desc->col_name_lower_case();
-        auto* array = _batch->GetColumnByName(real_column_name).get();
-        auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name());
-        RETURN_IF_ERROR(arrow_column_to_doris_column(
-                array, _arrow_batch_cur_idx, column_with_type_and_name.column,
-                column_with_type_and_name.type, num_elements, _state->timezone_obj()));
-    }
-    _rows += num_elements;
-    _arrow_batch_cur_idx += num_elements;
-    return _fill_columns_from_path(block, num_elements);
-}
-
-Status NewFileArrowScanner::_convert_to_output_block(Block* output_block) {
-    if (!config::enable_new_load_scan_node) {
-        return Status::OK();
-    }
-    if (_input_block_ptr == output_block) {
-        return Status::OK();
-    }
-    RETURN_IF_ERROR(_cast_src_block(_input_block_ptr));
-    if (LIKELY(_input_block_ptr->rows() > 0)) {
-        RETURN_IF_ERROR(_materialize_dest_block(output_block));
-    }
-
-    return Status::OK();
-}
-
-// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
-// primitive type(PT1) ==materialize_block==> dest primitive type
-Status NewFileArrowScanner::_cast_src_block(Block* block) {
-    // cast primitive type(PT0) to primitive type(PT1)
-    for (size_t i = 0; i < _num_of_columns_from_file; ++i) {
-        SlotDescriptor* slot_desc = _required_slot_descs[i];
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        auto& arg = block->get_by_name(slot_desc->col_name());
-        // remove nullable here, let the get_function decide whether nullable
-        auto return_type = slot_desc->get_data_type_ptr();
-        ColumnsWithTypeAndName arguments {
-                arg,
-                {DataTypeString().create_column_const(
-                         arg.column->size(), remove_nullable(return_type)->get_family_name()),
-                 std::make_shared<DataTypeString>(), ""}};
-        auto func_cast =
-                SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type);
-        RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size()));
-        block->get_by_position(i).type = std::move(return_type);
-    }
-    return Status::OK();
-}
-
-Status NewFileArrowScanner::_next_arrow_batch() {
-    _arrow_batch_cur_idx = 0;
-    // first, init file reader
-    if (_cur_file_reader == nullptr || _cur_file_eof) {
-        RETURN_IF_ERROR(_open_next_reader());
-        _cur_file_eof = false;
-    }
-    // second, loop until find available arrow batch or EOF
-    while (!_scanner_eof) {
-        RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, &_cur_file_eof));
-        if (_cur_file_eof) {
-            RETURN_IF_ERROR(_open_next_reader());
-            _cur_file_eof = false;
-            continue;
-        }
-        if (_batch->num_rows() == 0) {
-            continue;
-        }
-        return Status::OK();
-    }
-    return Status::EndOfFile("EOF");
-}
-
-Status NewFileArrowScanner::_open_next_reader() {
-    // open_file_reader
-    if (_cur_file_reader != nullptr) {
-        delete _cur_file_reader;
-        _cur_file_reader = nullptr;
-    }
-
-    while (true) {
-        if (_next_range >= _ranges.size()) {
-            _scanner_eof = true;
-            return Status::OK();
-        }
-        const TFileRangeDesc& range = _ranges[_next_range++];
-        std::unique_ptr<FileReader> file_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
-                                                        range.start_offset, range.file_size, 0,
-                                                        file_reader));
-        RETURN_IF_ERROR(file_reader->open());
-        if (file_reader->size() == 0) {
-            file_reader->close();
-            continue;
-        }
-
-        int32_t num_of_columns_from_file = _file_slot_descs.size();
-
-        _cur_file_reader =
-                _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file,
-                                  range.start_offset, range.size);
-
-        auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id());
-        // TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty.
-        Status status =
-                _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone());
-        if (status.is_end_of_file()) {
-            continue;
-        } else {
-            if (!status.ok()) {
-                std::stringstream ss;
-                ss << " file: " << range.path << " error:" << status.get_error_msg();
-                return Status::InternalError(ss.str());
-            } else {
-                //                    _update_profile(_cur_file_reader->statistics());
-                return status;
-            }
-        }
-    }
-}
-
-NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent,
-                                             int64_t limit, const TFileScanRange& scan_range,
-                                             RuntimeProfile* profile,
-                                             const std::vector<TExpr>& pre_filter_texprs)
-        : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {
-    //        _init_profiles(profile);
-}
-
-ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(
-        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
-        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
-    return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
-                                 range_start_offset, range_size, false);
-}
-
-NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                                     const TFileScanRange& scan_range, RuntimeProfile* profile,
-                                     const std::vector<TExpr>& pre_filter_texprs)
-        : NewFileArrowScanner(state, parent, limit, scan_range, profile, pre_filter_texprs) {}
-
-ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(
-        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* file_reader,
-        int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) {
-    return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file,
-                             range_start_offset, range_size, false);
-}
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h
deleted file mode 100644
index 281373a70d..0000000000
--- a/be/src/vec/exec/scan/new_file_arrow_scanner.h
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <exec/arrow/arrow_reader.h>
-
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/function_filter.h"
-#include "vec/exec/scan/new_file_scanner.h"
-#include "vec/exec/scan/vscanner.h"
-
-namespace doris::vectorized {
-class NewFileArrowScanner : public NewFileScanner {
-public:
-    NewFileArrowScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                        const TFileScanRange& scan_range, RuntimeProfile* profile,
-                        const std::vector<TExpr>& pre_filter_texprs);
-    Status open(RuntimeState* state) override;
-
-protected:
-    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
-    virtual ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
-                                               FileReader* file_reader,
-                                               int32_t num_of_columns_from_file,
-                                               int64_t range_start_offset, int64_t range_size) = 0;
-    // Convert input block to output block, if needed.
-    Status _convert_to_output_block(Block* output_block);
-
-private:
-    Status _open_next_reader();
-    Status _next_arrow_batch();
-    Status _init_arrow_batch_if_necessary();
-    Status _append_batch_to_block(Block* block);
-    Status _cast_src_block(Block* block);
-
-private:
-    // Reader
-    ArrowReaderWrap* _cur_file_reader;
-    bool _cur_file_eof; // is read over?
-    std::shared_ptr<arrow::RecordBatch> _batch;
-    size_t _arrow_batch_cur_idx;
-};
-
-class NewFileParquetScanner final : public NewFileArrowScanner {
-public:
-    NewFileParquetScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                          const TFileScanRange& scan_range, RuntimeProfile* profile,
-                          const std::vector<TExpr>& pre_filter_texprs);
-
-    ~NewFileParquetScanner() override = default;
-
-protected:
-    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
-                                       FileReader* file_reader, int32_t num_of_columns_from_file,
-                                       int64_t range_start_offset, int64_t range_size) override;
-
-    void _init_profiles(RuntimeProfile* profile) override {};
-};
-
-class NewFileORCScanner final : public NewFileArrowScanner {
-public:
-    NewFileORCScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                      const TFileScanRange& scan_range, RuntimeProfile* profile,
-                      const std::vector<TExpr>& pre_filter_texprs);
-
-    ~NewFileORCScanner() override = default;
-
-protected:
-    ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& file_slot_descs,
-                                       FileReader* file_reader, int32_t num_of_columns_from_file,
-                                       int64_t range_start_offset, int64_t range_size) override;
-    void _init_profiles(RuntimeProfile* profile) override {};
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 039082ab1c..49319f1e6b 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -18,8 +18,6 @@
 #include "vec/exec/scan/new_file_scan_node.h"
 
 #include "vec/columns/column_const.h"
-#include "vec/exec/scan/new_file_arrow_scanner.h"
-#include "vec/exec/scan/new_file_text_scanner.h"
 #include "vec/exec/scan/new_olap_scanner.h"
 #include "vec/exec/scan/vfile_scanner.h"
 #include "vec/functions/in.h"
@@ -102,28 +100,9 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
 }
 
 VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
-    VScanner* scanner = nullptr;
-    if (config::enable_new_file_scanner) {
-        scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
-        ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
-    } else {
-        switch (scan_range.params.format_type) {
-        case TFileFormatType::FORMAT_PARQUET:
-            scanner = new NewFileParquetScanner(_state, this, _limit_per_scanner, scan_range,
-                                                runtime_profile(), std::vector<TExpr>());
-            break;
-        case TFileFormatType::FORMAT_ORC:
-            scanner = new NewFileORCScanner(_state, this, _limit_per_scanner, scan_range,
-                                            runtime_profile(), std::vector<TExpr>());
-            break;
-
-        default:
-            scanner = new NewFileTextScanner(_state, this, _limit_per_scanner, scan_range,
-                                             runtime_profile(), std::vector<TExpr>());
-            break;
-        }
-        ((NewFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get());
-    }
+    VScanner* scanner =
+            new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
+    ((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
     _scanner_pool.add(scanner);
     // TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead?
     scanner->reg_conjunct_ctxs(_conjunct_ctxs);
diff --git a/be/src/vec/exec/scan/new_file_scanner.cpp b/be/src/vec/exec/scan/new_file_scanner.cpp
deleted file mode 100644
index 6e511fe10f..0000000000
--- a/be/src/vec/exec/scan/new_file_scanner.cpp
+++ /dev/null
@@ -1,317 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/scan/new_file_scanner.h"
-
-#include <fmt/format.h>
-
-#include <vec/data_types/data_type_factory.hpp>
-
-#include "common/logging.h"
-#include "common/utils.h"
-#include "exec/exec_node.h"
-#include "exec/text_converter.hpp"
-#include "exprs/expr_context.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/runtime_state.h"
-#include "runtime/tuple.h"
-#include "vec/exec/scan/new_file_scan_node.h"
-
-namespace doris::vectorized {
-
-NewFileScanner::NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                               const TFileScanRange& scan_range, RuntimeProfile* profile,
-                               const std::vector<TExpr>& pre_filter_texprs)
-        : VScanner(state, static_cast<VScanNode*>(parent), limit),
-          _params(scan_range.params),
-          _ranges(scan_range.ranges),
-          _next_range(0),
-          _mem_pool(std::make_unique<MemPool>()),
-          _profile(profile),
-          _pre_filter_texprs(pre_filter_texprs),
-          _strict_mode(false) {}
-
-Status NewFileScanner::open(RuntimeState* state) {
-    RETURN_IF_ERROR(VScanner::open(state));
-    RETURN_IF_ERROR(_init_expr_ctxes());
-    return Status::OK();
-}
-
-Status NewFileScanner::prepare(VExprContext** vconjunct_ctx_ptr) {
-    if (vconjunct_ctx_ptr != nullptr) {
-        // Copy vconjunct_ctx_ptr from scan node to this scanner's _vconjunct_ctx.
-        RETURN_IF_ERROR((*vconjunct_ctx_ptr)->clone(_state, &_vconjunct_ctx));
-    }
-
-    return Status::OK();
-}
-
-Status NewFileScanner::_init_expr_ctxes() {
-    if (_input_tuple_desc == nullptr) {
-        std::stringstream ss;
-        ss << "Unknown source tuple descriptor, tuple_id=" << _params.src_tuple_id;
-        return Status::InternalError(ss.str());
-    }
-    DCHECK(!_ranges.empty());
-
-    std::map<SlotId, int> _full_src_index_map;
-    std::map<SlotId, SlotDescriptor*> _full_src_slot_map;
-    int index = 0;
-    for (const auto& slot_desc : _input_tuple_desc->slots()) {
-        _full_src_slot_map.emplace(slot_desc->id(), slot_desc);
-        _full_src_index_map.emplace(slot_desc->id(), index++);
-    }
-
-    _num_of_columns_from_file = _params.num_of_columns_from_file;
-    for (const auto& slot_info : _params.required_slots) {
-        auto slot_id = slot_info.slot_id;
-        auto it = _full_src_slot_map.find(slot_id);
-        if (it == std::end(_full_src_slot_map)) {
-            std::stringstream ss;
-            ss << "Unknown source slot descriptor, slot_id=" << slot_id;
-            return Status::InternalError(ss.str());
-        }
-        _required_slot_descs.emplace_back(it->second);
-        if (slot_info.is_file_slot) {
-            _file_slot_descs.emplace_back(it->second);
-            auto iti = _full_src_index_map.find(slot_id);
-            _file_slot_index_map.emplace(slot_id, iti->second);
-        } else {
-            _partition_slot_descs.emplace_back(it->second);
-            auto iti = _full_src_index_map.find(slot_id);
-            _partition_slot_index_map.emplace(slot_id, iti->second - _num_of_columns_from_file);
-        }
-    }
-
-    _src_tuple = (doris::Tuple*)_mem_pool->allocate(_input_tuple_desc->byte_size());
-    _src_tuple_row = (TupleRow*)_mem_pool->allocate(sizeof(Tuple*));
-    _src_tuple_row->set_tuple(0, _src_tuple);
-    _row_desc.reset(new RowDescriptor(_state->desc_tbl(),
-                                      std::vector<TupleId>({_params.src_tuple_id}),
-                                      std::vector<bool>({false})));
-
-    // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
-    if (!_pre_filter_texprs.empty()) {
-        // for vectorized, preceding filter exprs should be compounded to one passed from fe.
-        DCHECK(_pre_filter_texprs.size() == 1);
-        _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*);
-        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(
-                _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get()));
-        RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc));
-        RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state));
-    }
-
-    // Construct dest slots information
-    if (config::enable_new_load_scan_node) {
-        if (_output_tuple_desc == nullptr) {
-            return Status::InternalError("Unknown dest tuple descriptor, tuple_id={}",
-                                         _params.dest_tuple_id);
-        }
-
-        bool has_slot_id_map = _params.__isset.dest_sid_to_src_sid_without_trans;
-        for (auto slot_desc : _output_tuple_desc->slots()) {
-            if (!slot_desc->is_materialized()) {
-                continue;
-            }
-            auto it = _params.expr_of_dest_slot.find(slot_desc->id());
-            if (it == std::end(_params.expr_of_dest_slot)) {
-                return Status::InternalError("No expr for dest slot, id={}, name={}",
-                                             slot_desc->id(), slot_desc->col_name());
-            }
-
-            vectorized::VExprContext* ctx = nullptr;
-            RETURN_IF_ERROR(
-                    vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &ctx));
-            RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get()));
-            RETURN_IF_ERROR(ctx->open(_state));
-            _dest_vexpr_ctx.emplace_back(ctx);
-            if (has_slot_id_map) {
-                auto it1 = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
-                if (it1 == std::end(_params.dest_sid_to_src_sid_without_trans)) {
-                    _src_slot_descs_order_by_dest.emplace_back(nullptr);
-                } else {
-                    auto _src_slot_it = _full_src_slot_map.find(it1->second);
-                    if (_src_slot_it == std::end(_full_src_slot_map)) {
-                        return Status::InternalError("No src slot {} in src slot descs",
-                                                     it1->second);
-                    }
-                    _src_slot_descs_order_by_dest.emplace_back(_src_slot_it->second);
-                }
-            }
-        }
-    }
-
-    return Status::OK();
-}
-
-Status NewFileScanner::init_block(vectorized::Block* block) {
-    (*block).clear();
-    _rows = 0;
-    for (const auto& slot_desc : _required_slot_descs) {
-        if (slot_desc == nullptr) {
-            continue;
-        }
-        auto is_nullable = slot_desc->is_nullable();
-        auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
-                                                                                  is_nullable);
-        if (data_type == nullptr) {
-            return Status::NotSupported(
-                    fmt::format("Not support type for column:{}", slot_desc->col_name()));
-        }
-        MutableColumnPtr data_column = data_type->create_column();
-        (*block).insert(
-                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
-    }
-    return Status::OK();
-}
-
-Status NewFileScanner::_fill_columns_from_path(vectorized::Block* _block, size_t rows) {
-    const TFileRangeDesc& range = _ranges.at(_next_range - 1);
-    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
-        for (const auto& slot_desc : _partition_slot_descs) {
-            if (slot_desc == nullptr) continue;
-            auto it = _partition_slot_index_map.find(slot_desc->id());
-            if (it == std::end(_partition_slot_index_map)) {
-                std::stringstream ss;
-                ss << "Unknown source slot descriptor, slot_id=" << slot_desc->id();
-                return Status::InternalError(ss.str());
-            }
-            const std::string& column_from_path = range.columns_from_path[it->second];
-
-            auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
-            IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-
-            for (size_t j = 0; j < rows; ++j) {
-                _text_converter->write_vec_column(slot_desc, col_ptr,
-                                                  const_cast<char*>(column_from_path.c_str()),
-                                                  column_from_path.size(), true, false);
-            }
-        }
-    }
-    return Status::OK();
-}
-
-Status NewFileScanner::_filter_input_block(Block* block) {
-    if (!config::enable_new_load_scan_node) {
-        return Status::OK();
-    }
-    if (_is_load) {
-        auto origin_column_num = block->columns();
-        RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, block,
-                                                               origin_column_num));
-    }
-    return Status::OK();
-}
-
-Status NewFileScanner::_materialize_dest_block(vectorized::Block* dest_block) {
-    // Do vectorized expr here
-    int ctx_idx = 0;
-    size_t rows = _input_block.rows();
-    auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
-    auto& filter_map = filter_column->get_data();
-    auto origin_column_num = _input_block.columns();
-
-    for (auto slot_desc : _output_tuple_desc->slots()) {
-        if (!slot_desc->is_materialized()) {
-            continue;
-        }
-        int dest_index = ctx_idx++;
-
-        auto* ctx = _dest_vexpr_ctx[dest_index];
-        int result_column_id = -1;
-        // PT1 => dest primitive type
-        RETURN_IF_ERROR(ctx->execute(_input_block_ptr, &result_column_id));
-        bool is_origin_column = result_column_id < origin_column_num;
-        auto column_ptr =
-                is_origin_column && _src_block_mem_reuse
-                        ? _input_block.get_by_position(result_column_id).column->clone_resized(rows)
-                        : _input_block.get_by_position(result_column_id).column;
-
-        DCHECK(column_ptr != nullptr);
-
-        // because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
-        // is likely to be nullable
-        if (LIKELY(column_ptr->is_nullable())) {
-            auto nullable_column =
-                    reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
-            for (int i = 0; i < rows; ++i) {
-                if (filter_map[i] && nullable_column->is_null_at(i)) {
-                    if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
-                        !_input_block.get_by_position(dest_index).column->is_null_at(i)) {
-                        RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                                [&]() -> std::string {
-                                    return _input_block.dump_one_line(i, _num_of_columns_from_file);
-                                },
-                                [&]() -> std::string {
-                                    auto raw_value = _input_block.get_by_position(ctx_idx)
-                                                             .column->get_data_at(i);
-                                    std::string raw_string = raw_value.to_string();
-                                    fmt::memory_buffer error_msg;
-                                    fmt::format_to(error_msg,
-                                                   "column({}) value is incorrect while strict "
-                                                   "mode is {}, "
-                                                   "src value is {}",
-                                                   slot_desc->col_name(), _strict_mode, raw_string);
-                                    return fmt::to_string(error_msg);
-                                },
-                                &_scanner_eof));
-                        filter_map[i] = false;
-                    } else if (!slot_desc->is_nullable()) {
-                        RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                                [&]() -> std::string {
-                                    return _input_block.dump_one_line(i, _num_of_columns_from_file);
-                                },
-                                [&]() -> std::string {
-                                    fmt::memory_buffer error_msg;
-                                    fmt::format_to(error_msg,
-                                                   "column({}) values is null while columns is not "
-                                                   "nullable",
-                                                   slot_desc->col_name());
-                                    return fmt::to_string(error_msg);
-                                },
-                                &_scanner_eof));
-                        filter_map[i] = false;
-                    }
-                }
-            }
-            if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr();
-        } else if (slot_desc->is_nullable()) {
-            column_ptr = vectorized::make_nullable(column_ptr);
-        }
-        dest_block->insert(vectorized::ColumnWithTypeAndName(
-                std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
-    }
-
-    // after do the dest block insert operation, clear _src_block to remove the reference of origin column
-    if (_src_block_mem_reuse) {
-        _input_block.clear_column_data(origin_column_num);
-    } else {
-        _input_block.clear();
-    }
-
-    size_t dest_size = dest_block->columns();
-    // do filter
-    dest_block->insert(vectorized::ColumnWithTypeAndName(
-            std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
-            "filter column"));
-    RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size));
-
-    return Status::OK();
-}
-
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_scanner.h b/be/src/vec/exec/scan/new_file_scanner.h
deleted file mode 100644
index 50423bd3e6..0000000000
--- a/be/src/vec/exec/scan/new_file_scanner.h
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/text_converter.h"
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/function_filter.h"
-#include "runtime/tuple.h"
-#include "vec/exec/scan/vscanner.h"
-
-namespace doris::vectorized {
-
-class NewFileScanNode;
-
-class NewFileScanner : public VScanner {
-public:
-    NewFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                   const TFileScanRange& scan_range, RuntimeProfile* profile,
-                   const std::vector<TExpr>& pre_filter_texprs);
-
-    Status open(RuntimeState* state) override;
-
-    Status prepare(VExprContext** vconjunct_ctx_ptr);
-
-protected:
-    // Use prefilters to filter input block
-    Status _filter_input_block(Block* block);
-    Status _materialize_dest_block(vectorized::Block* output_block);
-
-protected:
-    virtual void _init_profiles(RuntimeProfile* profile) = 0;
-
-    Status _fill_columns_from_path(vectorized::Block* output_block, size_t rows);
-    Status init_block(vectorized::Block* block);
-
-    std::unique_ptr<TextConverter> _text_converter;
-
-    const TFileScanRangeParams& _params;
-
-    const std::vector<TFileRangeDesc>& _ranges;
-    int _next_range;
-
-    // Used for constructing tuple
-    std::vector<SlotDescriptor*> _required_slot_descs;
-    // File source slot descriptors
-    std::vector<SlotDescriptor*> _file_slot_descs;
-    // File slot id to index map.
-    std::map<SlotId, int> _file_slot_index_map;
-    // Partition source slot descriptors
-    std::vector<SlotDescriptor*> _partition_slot_descs;
-    // Partition slot id to index map
-    std::map<SlotId, int> _partition_slot_index_map;
-    std::unique_ptr<RowDescriptor> _row_desc;
-    doris::Tuple* _src_tuple;
-    TupleRow* _src_tuple_row;
-
-    // Mem pool used to allocate _src_tuple and _src_tuple_row
-    std::unique_ptr<MemPool> _mem_pool;
-
-    // Profile
-    RuntimeProfile* _profile;
-    RuntimeProfile::Counter* _rows_read_counter;
-    RuntimeProfile::Counter* _read_timer;
-
-    bool _scanner_eof = false;
-    int _rows = 0;
-    int _num_of_columns_from_file;
-
-    const std::vector<TExpr> _pre_filter_texprs;
-
-    std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
-    // to filter src tuple directly.
-    std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
-
-    // the map values of dest slot id to src slot desc
-    // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
-    std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
-
-    bool _src_block_mem_reuse = false;
-    bool _strict_mode;
-
-private:
-    Status _init_expr_ctxes();
-};
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_text_scanner.cpp b/be/src/vec/exec/scan/new_file_text_scanner.cpp
deleted file mode 100644
index 2202dac76b..0000000000
--- a/be/src/vec/exec/scan/new_file_text_scanner.cpp
+++ /dev/null
@@ -1,263 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/exec/scan/new_file_text_scanner.h"
-
-#include "exec/plain_text_line_reader.h"
-#include "io/file_factory.h"
-#include "util/utf8_check.h"
-#include "vec/exec/scan/vscan_node.h"
-
-namespace doris::vectorized {
-
-NewFileTextScanner::NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                                       const TFileScanRange& scan_range, RuntimeProfile* profile,
-                                       const std::vector<TExpr>& pre_filter_texprs)
-        : NewFileScanner(state, parent, limit, scan_range, profile, pre_filter_texprs),
-          _cur_file_reader(nullptr),
-          _cur_line_reader(nullptr),
-          _cur_line_reader_eof(false),
-          _skip_lines(0),
-          _success(false) {}
-
-Status NewFileTextScanner::open(RuntimeState* state) {
-    RETURN_IF_ERROR(NewFileScanner::open(state));
-    if (_ranges.empty()) {
-        return Status::OK();
-    }
-    _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
-    SCOPED_TIMER(_read_timer);
-    if (!_is_load) {
-        RETURN_IF_ERROR(init_block(block));
-    }
-    const int batch_size = state->batch_size();
-    *eof = false;
-    int current_rows = _rows;
-    while (_rows < batch_size && !_scanner_eof) {
-        if (_cur_line_reader == nullptr || _cur_line_reader_eof) {
-            RETURN_IF_ERROR(_open_next_reader());
-            // If there isn't any more reader, break this
-            if (_scanner_eof) {
-                continue;
-            }
-        }
-        const uint8_t* ptr = nullptr;
-        size_t size = 0;
-        RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof));
-        if (_skip_lines > 0) {
-            _skip_lines--;
-            continue;
-        }
-        if (size == 0) {
-            // Read empty row, just continue
-            continue;
-        }
-        {
-            COUNTER_UPDATE(_rows_read_counter, 1);
-            RETURN_IF_ERROR(_fill_file_columns(Slice(ptr, size), block));
-        }
-        if (_cur_line_reader_eof) {
-            RETURN_IF_ERROR(_fill_columns_from_path(block, _rows - current_rows));
-            current_rows = _rows;
-        }
-    }
-    if (_scanner_eof && block->rows() == 0) {
-        *eof = true;
-    }
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_fill_file_columns(const Slice& line, vectorized::Block* _block) {
-    RETURN_IF_ERROR(_line_split_to_values(line));
-    if (!_success) {
-        // If not success, which means we met an invalid row, return.
-        return Status::OK();
-    }
-
-    for (int i = 0; i < _split_values.size(); ++i) {
-        auto slot_desc = _file_slot_descs[i];
-        const Slice& value = _split_values[i];
-
-        auto doris_column = _block->get_by_name(slot_desc->col_name()).column;
-        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-        _text_converter->write_vec_column(slot_desc, col_ptr, value.data, value.size, true, false);
-    }
-    _rows++;
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_line_split_to_values(const Slice& line) {
-    if (!validate_utf8(line.data, line.size)) {
-        RETURN_IF_ERROR(_state->append_error_msg_to_file(
-                []() -> std::string { return "Unable to display"; },
-                []() -> std::string {
-                    fmt::memory_buffer error_msg;
-                    fmt::format_to(error_msg, "{}", "Unable to display");
-                    return fmt::to_string(error_msg);
-                },
-                &_scanner_eof));
-        _success = false;
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(_split_line(line));
-
-    _success = true;
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_open_next_reader() {
-    if (_next_range >= _ranges.size()) {
-        _scanner_eof = true;
-        return Status::OK();
-    }
-
-    RETURN_IF_ERROR(_open_file_reader());
-    RETURN_IF_ERROR(_open_line_reader());
-    _next_range++;
-
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_open_file_reader() {
-    const TFileRangeDesc& range = _ranges[_next_range];
-    RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _params, range.path,
-                                                    range.start_offset, range.file_size, 0,
-                                                    _cur_file_reader));
-    return _cur_file_reader->open();
-}
-
-Status NewFileTextScanner::_open_line_reader() {
-    if (_cur_line_reader != nullptr) {
-        delete _cur_line_reader;
-        _cur_line_reader = nullptr;
-    }
-
-    const TFileRangeDesc& range = _ranges[_next_range];
-    int64_t size = range.size;
-    if (range.start_offset != 0) {
-        if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) {
-            std::stringstream ss;
-            ss << "For now we do not support split compressed file";
-            return Status::InternalError(ss.str());
-        }
-        size += 1;
-        // not first range will always skip one line
-        _skip_lines = 1;
-    }
-
-    // open line reader
-    switch (_params.format_type) {
-    case TFileFormatType::FORMAT_CSV_PLAIN:
-        _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
-                                                   _line_delimiter, _line_delimiter_length);
-        break;
-    default: {
-        std::stringstream ss;
-        ss << "Unknown format type, cannot init line reader, type=" << _params.format_type;
-        return Status::InternalError(ss.str());
-    }
-    }
-
-    _cur_line_reader_eof = false;
-
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_split_line(const Slice& line) {
-    _split_values.clear();
-    std::vector<Slice> tmp_split_values;
-    tmp_split_values.reserve(_num_of_columns_from_file);
-
-    const char* value = line.data;
-    size_t start = 0;     // point to the start pos of next col value.
-    size_t curpos = 0;    // point to the start pos of separator matching sequence.
-    size_t p1 = 0;        // point to the current pos of separator matching sequence.
-    size_t non_space = 0; // point to the last pos of non_space charactor.
-
-    // Separator: AAAA
-    //
-    //    p1
-    //     ▼
-    //     AAAA
-    //   1000AAAA2000AAAA
-    //   ▲   ▲
-    // Start │
-    //     curpos
-
-    while (curpos < line.size) {
-        if (*(value + curpos + p1) != _value_separator[p1]) {
-            // Not match, move forward:
-            curpos += (p1 == 0 ? 1 : p1);
-            p1 = 0;
-        } else {
-            p1++;
-            if (p1 == _value_separator_length) {
-                // Match a separator
-                non_space = curpos;
-                // Trim tailing spaces. Be consistent with hive and trino's behavior.
-                if (_state->trim_tailing_spaces_for_external_table_query()) {
-                    while (non_space > start && *(value + non_space - 1) == ' ') {
-                        non_space--;
-                    }
-                }
-                tmp_split_values.emplace_back(value + start, non_space - start);
-                start = curpos + _value_separator_length;
-                curpos = start;
-                p1 = 0;
-                non_space = 0;
-            }
-        }
-    }
-
-    CHECK(curpos == line.size) << curpos << " vs " << line.size;
-    non_space = curpos;
-    if (_state->trim_tailing_spaces_for_external_table_query()) {
-        while (non_space > start && *(value + non_space - 1) == ' ') {
-            non_space--;
-        }
-    }
-
-    tmp_split_values.emplace_back(value + start, non_space - start);
-    for (const auto& slot : _file_slot_descs) {
-        auto it = _file_slot_index_map.find(slot->id());
-        if (it == std::end(_file_slot_index_map)) {
-            std::stringstream ss;
-            ss << "Unknown _file_slot_index_map, slot_id=" << slot->id();
-            return Status::InternalError(ss.str());
-        }
-        int index = it->second;
-        CHECK(index < _num_of_columns_from_file) << index << " vs " << _num_of_columns_from_file;
-        _split_values.emplace_back(tmp_split_values[index]);
-    }
-    return Status::OK();
-}
-
-Status NewFileTextScanner::_convert_to_output_block(Block* output_block) {
-    if (_input_block_ptr == output_block) {
-        return Status::OK();
-    }
-    if (LIKELY(_input_block_ptr->rows() > 0)) {
-        RETURN_IF_ERROR(_materialize_dest_block(output_block));
-    }
-    return Status::OK();
-}
-} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_file_text_scanner.h b/be/src/vec/exec/scan/new_file_text_scanner.h
deleted file mode 100644
index 19cf6094f4..0000000000
--- a/be/src/vec/exec/scan/new_file_text_scanner.h
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <exec/arrow/arrow_reader.h>
-
-#include "exec/line_reader.h"
-#include "exprs/bloomfilter_predicate.h"
-#include "exprs/function_filter.h"
-#include "vec/exec/scan/new_file_scanner.h"
-#include "vec/exec/scan/vscanner.h"
-
-namespace doris::vectorized {
-class NewFileTextScanner : public NewFileScanner {
-public:
-    NewFileTextScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
-                       const TFileScanRange& scan_range, RuntimeProfile* profile,
-                       const std::vector<TExpr>& pre_filter_texprs);
-
-    Status open(RuntimeState* state) override;
-
-protected:
-    void _init_profiles(RuntimeProfile* profile) override {}
-    Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
-    Status _convert_to_output_block(Block* output_block);
-
-private:
-    Status _fill_file_columns(const Slice& line, vectorized::Block* _block);
-    Status _open_next_reader();
-    Status _open_file_reader();
-    Status _open_line_reader();
-    Status _line_split_to_values(const Slice& line);
-    Status _split_line(const Slice& line);
-    // Reader
-    std::unique_ptr<FileReader> _cur_file_reader;
-    LineReader* _cur_line_reader;
-    bool _cur_line_reader_eof;
-
-    // When we fetch range start from 0, header_type="csv_with_names" skip first line
-    // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
-    // When we fetch range doesn't start
-    int _skip_lines;
-    std::vector<Slice> _split_values;
-    std::string _value_separator;
-    std::string _line_delimiter;
-    int _value_separator_length;
-    int _line_delimiter_length;
-
-    bool _success;
-};
-} // namespace doris::vectorized
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 26447413d1..5d68f6edef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -172,7 +172,7 @@ public class StreamLoadPlanner {
         }
 
         // create scan node
-        if (Config.enable_new_load_scan_node) {
+        if (Config.enable_new_load_scan_node && Config.enable_vectorized_load) {
             ExternalFileScanNode fileScanNode = new ExternalFileScanNode(new PlanNodeId(0), scanTupleDesc);
             if (!Util.isCsvFormat(taskInfo.getFormatType())) {
                 throw new AnalysisException(
diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
index fa4d2e0c07..52451a5207 100644
--- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
@@ -185,22 +185,18 @@ suite("test_broker_load", "p0") {
         String[][] backends = sql """ show backends; """
         assertTrue(backends.size() > 0)
         for (String[] backend in backends) {
-            StringBuilder setConfigCommand = new StringBuilder();
-            setConfigCommand.append("curl -X POST http://")
-            setConfigCommand.append(backend[2])
-            setConfigCommand.append(":")
-            setConfigCommand.append(backend[5])
-            setConfigCommand.append("/api/update_config?")
-            String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
-            logger.info(command1)
-            String command2 = setConfigCommand.toString() + "enable_new_file_scanner=$flag"
-            logger.info(command2)
-            def process1 = command1.execute()
-            int code = process1.waitFor()
-            assertEquals(code, 0)
-            def process2 = command2.execute()
-            code = process1.waitFor()
-            assertEquals(code, 0)
+            // No need to set this config anymore, but leave this code sample here
+            // StringBuilder setConfigCommand = new StringBuilder();
+            // setConfigCommand.append("curl -X POST http://")
+            // setConfigCommand.append(backend[2])
+            // setConfigCommand.append(":")
+            // setConfigCommand.append(backend[5])
+            // setConfigCommand.append("/api/update_config?")
+            // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
+            // logger.info(command1)
+            // def process1 = command1.execute()
+            // int code = process1.waitFor()
+            // assertEquals(code, 0)
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org