You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/08 07:07:45 UTC

[incubator-doris] branch master updated: [Refactor] Use file factory to replace create file reader/writer (#9505)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 94089b9192 [Refactor] Use file factory to replace create file reader/writer (#9505)
94089b9192 is described below

commit 94089b919211403de7e6d8e664715d44ab4fd4f3
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Wed Jun 8 15:07:39 2022 +0800

    [Refactor] Use file factory to replace create file reader/writer (#9505)
    
    1. Simplify code logic and improve abstraction
    2. Fix the mem leak of raw pointer
    
    Co-authored-by: lihaopeng <li...@baidu.com>
---
 be/src/exec/CMakeLists.txt                         |  22 ++--
 be/src/exec/arrow/arrow_reader.cpp                 |   2 +-
 be/src/exec/arrow/orc_reader.cpp                   |   2 +-
 be/src/exec/arrow/parquet_reader.cpp               |   6 +-
 be/src/exec/broker_scanner.cpp                     |  94 +++------------
 be/src/exec/broker_scanner.h                       |   4 +-
 be/src/exec/json_scanner.cpp                       |  73 ++----------
 be/src/exec/json_scanner.h                         |   5 +-
 be/src/exec/orc_scanner.cpp                        |  54 ++-------
 be/src/exec/parquet_scanner.cpp                    |  49 ++------
 be/src/exec/parquet_writer.cpp                     |  11 +-
 be/src/exec/plain_binary_line_reader.cpp           |   2 +-
 be/src/exec/plain_text_line_reader.cpp             |   2 +-
 be/src/io/CMakeLists.txt                           |  46 +++++++
 be/src/{exec => io}/broker_reader.cpp              |   2 +-
 be/src/{exec => io}/broker_reader.h                |   2 +-
 be/src/{exec => io}/broker_writer.cpp              |   2 +-
 be/src/{exec => io}/broker_writer.h                |   2 +-
 be/src/{exec => io}/buffered_reader.cpp            |   2 +-
 be/src/{exec => io}/buffered_reader.h              |   2 +-
 be/src/io/file_factory.cpp                         | 132 +++++++++++++++++++++
 be/src/io/file_factory.h                           |  77 ++++++++++++
 be/src/{exec => io}/file_reader.h                  |   0
 be/src/{exec => io}/file_writer.h                  |   0
 be/src/{exec => io}/hdfs_file_reader.cpp           |   2 +-
 be/src/{exec => io}/hdfs_file_reader.h             |   2 +-
 be/src/{exec => io}/hdfs_reader_writer.cpp         |  11 +-
 be/src/{exec => io}/hdfs_reader_writer.h           |   6 +-
 be/src/{exec => io}/hdfs_writer.cpp                |   2 +-
 be/src/{exec => io}/hdfs_writer.h                  |   2 +-
 be/src/{exec => io}/local_file_reader.cpp          |   2 +-
 be/src/{exec => io}/local_file_reader.h            |   2 +-
 be/src/{exec => io}/local_file_writer.cpp          |  20 +++-
 be/src/{exec => io}/local_file_writer.h            |   7 +-
 be/src/{exec => io}/s3_reader.cpp                  |   2 +-
 be/src/{exec => io}/s3_reader.h                    |   2 +-
 be/src/{exec => io}/s3_writer.cpp                  |   2 +-
 be/src/{exec => io}/s3_writer.h                    |   2 +-
 be/src/runtime/export_sink.cpp                     |  53 ++-------
 be/src/runtime/file_result_writer.cpp              |  40 ++-----
 be/src/runtime/file_result_writer.h                |   2 +-
 be/src/runtime/routine_load/kafka_consumer_pipe.h  |   2 +-
 be/src/runtime/stream_load/stream_load_pipe.h      |   2 +-
 be/src/util/broker_load_error_hub.cpp              |   2 +-
 be/src/util/broker_storage_backend.cpp             |   4 +-
 be/src/vec/exec/varrow_scanner.cpp                 |  44 +------
 be/src/vec/exec/vjson_scanner.cpp                  |   2 +-
 be/test/exec/broker_reader_test.cpp                |   2 +-
 be/test/exec/broker_scan_node_test.cpp             |   2 +-
 be/test/exec/broker_scanner_test.cpp               |   2 +-
 be/test/exec/buffered_reader_test.cpp              |   4 +-
 be/test/exec/hdfs_file_reader_test.cpp             |   4 +-
 be/test/exec/json_scanner_test.cpp                 |   2 +-
 be/test/exec/json_scanner_with_jsonpath_test.cpp   |   2 +-
 be/test/exec/multi_bytes_separator_test.cpp        |   2 +-
 be/test/exec/orc_scanner_test.cpp                  |   2 +-
 be/test/exec/parquet_scanner_test.cpp              |   2 +-
 be/test/exec/plain_text_line_reader_bzip_test.cpp  |   2 +-
 be/test/exec/plain_text_line_reader_gzip_test.cpp  |   2 +-
 .../exec/plain_text_line_reader_lz4frame_test.cpp  |   2 +-
 be/test/exec/plain_text_line_reader_lzop_test.cpp  |   2 +-
 .../plain_text_line_reader_uncompressed_test.cpp   |   2 +-
 be/test/exec/s3_reader_test.cpp                    |   4 +-
 be/test/vec/exec/vbroker_scan_node_test.cpp        |   2 +-
 be/test/vec/exec/vbroker_scanner_test.cpp          |   2 +-
 be/test/vec/exec/vjson_scanner_test.cpp            |   2 +-
 be/test/vec/exec/vorc_scanner_test.cpp             |   2 +-
 be/test/vec/exec/vparquet_scanner_test.cpp         |   2 +-
 68 files changed, 427 insertions(+), 429 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7ce4c4caf3..5708eb0465 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -28,8 +28,7 @@ set(EXEC_FILES
     analytic_eval_node.cpp
     blocking_join_node.cpp
     broker_scan_node.cpp
-    broker_reader.cpp
-    buffered_reader.cpp
+        ../io/buffered_reader.cpp
     base_scanner.cpp
     broker_scanner.cpp
     cross_join_node.cpp
@@ -42,7 +41,6 @@ set(EXEC_FILES
     exchange_node.cpp
     hash_join_node.cpp
     hash_table.cpp
-    local_file_reader.cpp
     merge_node.cpp
     scan_node.cpp
     select_node.cpp
@@ -93,24 +91,28 @@ set(EXEC_FILES
     partitioned_hash_table.cc
     partitioned_aggregation_node.cc
     odbc_scan_node.cpp
-    local_file_writer.cpp
-    broker_writer.cpp
     parquet_scanner.cpp
     parquet_writer.cpp
     orc_scanner.cpp
     odbc_connector.cpp
     json_scanner.cpp
     assert_num_rows_node.cpp
-    s3_reader.cpp
-    s3_writer.cpp
-    hdfs_reader_writer.cpp
+
+        ../io/local_file_reader.cpp
+        ../io/local_file_writer.cpp
+        ../io/broker_reader.cpp
+        ../io/broker_writer.cpp
+        ../io/s3_reader.cpp
+        ../io/s3_writer.cpp
+        ../io/hdfs_reader_writer.cpp
+        ../io/file_factory.cpp
 )
 
 if (ARCH_AMD64)
     set(EXEC_FILES
         ${EXEC_FILES}
-        hdfs_file_reader.cpp
-        hdfs_writer.cpp
+            ../io/hdfs_file_reader.cpp
+            ../io/hdfs_writer.cpp
         )
 endif()
 
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
index 789c66be2c..94289a990b 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -21,9 +21,9 @@
 #include <time.h>
 
 #include "common/logging.h"
-#include "exec/file_reader.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/TPaloBrokerService.h"
+#include "io/file_reader.h"
 #include "runtime/broker_mgr.h"
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 5815f008df..536b852ad6 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -21,7 +21,7 @@
 #include <time.h>
 
 #include "common/logging.h"
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
 #include "runtime/mem_pool.h"
 #include "runtime/tuple.h"
 
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index 330d983c36..dad3e9e5e8 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -27,7 +27,11 @@
 
 #include "common/logging.h"
 #include "common/status.h"
-#include "exec/file_reader.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/TPaloBrokerService.h"
+#include "io/file_reader.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_pool.h"
 #include "runtime/string_value.h"
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 2c21fd3d54..8942c4fd67 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -23,22 +23,20 @@
 #include <sstream>
 
 #include "common/consts.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
 #include "exec/decompressor.h"
 #include "exec/exec_node.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_binary_line_reader.h"
 #include "exec/plain_text_line_reader.h"
-#include "exec/s3_reader.h"
 #include "exprs/expr.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/raw_value.h"
 #include "runtime/stream_load/load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_pipe.h"
 #include "runtime/tuple.h"
+#include "util/string_util.h"
 #include "util/utf8_check.h"
 
 namespace doris {
@@ -109,11 +107,8 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
             break; // break always
         }
     }
-    if (_scanner_eof) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
+
+    *eof = _scanner_eof;
     return Status::OK();
 }
 
@@ -131,16 +126,6 @@ Status BrokerScanner::open_next_reader() {
 }
 
 Status BrokerScanner::open_file_reader() {
-    if (_cur_file_reader != nullptr) {
-        if (_stream_load_pipe != nullptr) {
-            _stream_load_pipe.reset();
-            _cur_file_reader = nullptr;
-        } else {
-            delete _cur_file_reader;
-            _cur_file_reader = nullptr;
-        }
-    }
-
     const TBrokerRangeDesc& range = _ranges[_next_range];
     int64_t start_offset = range.start_offset;
     if (start_offset != 0) {
@@ -155,53 +140,11 @@ Status BrokerScanner::open_file_reader() {
             _skip_lines = 2;
         }
     }
-    switch (range.file_type) {
-    case TFileType::FILE_LOCAL: {
-        LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset);
-        RETURN_IF_ERROR(file_reader->open());
-        _cur_file_reader = file_reader;
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        FileReader* hdfs_file_reader;
-        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset,
-                                                        &hdfs_file_reader));
-        BufferedReader* file_reader = new BufferedReader(_profile, hdfs_file_reader);
-        RETURN_IF_ERROR(file_reader->open());
-        _cur_file_reader = file_reader;
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        BrokerReader* broker_reader =
-                new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
-                                 range.path, start_offset);
-        RETURN_IF_ERROR(broker_reader->open());
-        _cur_file_reader = broker_reader;
-        break;
-    }
-    case TFileType::FILE_S3: {
-        BufferedReader* s3_reader = new BufferedReader(
-                _profile, new S3Reader(_params.properties, range.path, start_offset));
-        RETURN_IF_ERROR(s3_reader->open());
-        _cur_file_reader = s3_reader;
-        break;
-    }
-    case TFileType::FILE_STREAM: {
-        _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id);
-        if (_stream_load_pipe == nullptr) {
-            VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id);
-            return Status::InternalError("unknown stream load id");
-        }
-        _cur_file_reader = _stream_load_pipe.get();
-        break;
-    }
-    default: {
-        std::stringstream ss;
-        ss << "Unknown file type, type=" << range.file_type;
-        return Status::InternalError(ss.str());
-    }
-    }
-    return Status::OK();
+
+    RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile,
+                                                    _broker_addresses, _params.properties, range,
+                                                    start_offset, _cur_file_reader));
+    return _cur_file_reader->open();
 }
 
 Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
@@ -280,11 +223,12 @@ Status BrokerScanner::open_line_reader() {
     case TFileFormatType::FORMAT_CSV_LZ4FRAME:
     case TFileFormatType::FORMAT_CSV_LZOP:
     case TFileFormatType::FORMAT_CSV_DEFLATE:
-        _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
-                                                   size, _line_delimiter, _line_delimiter_length);
+        _cur_line_reader =
+                new PlainTextLineReader(_profile, _cur_file_reader.get(), _cur_decompressor, size,
+                                        _line_delimiter, _line_delimiter_length);
         break;
     case TFileFormatType::FORMAT_PROTO:
-        _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader);
+        _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader.get());
         break;
     default: {
         std::stringstream ss;
@@ -309,16 +253,6 @@ void BrokerScanner::close() {
         delete _cur_line_reader;
         _cur_line_reader = nullptr;
     }
-
-    if (_cur_file_reader != nullptr) {
-        if (_stream_load_pipe != nullptr) {
-            _stream_load_pipe.reset();
-            _cur_file_reader = nullptr;
-        } else {
-            delete _cur_file_reader;
-            _cur_file_reader = nullptr;
-        }
-    }
 }
 
 void BrokerScanner::split_line(const Slice& line) {
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index f10ce68518..379b1630cd 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -107,7 +107,7 @@ protected:
     int _line_delimiter_length;
 
     // Reader
-    FileReader* _cur_file_reader;
+    std::shared_ptr<FileReader> _cur_file_reader;
     LineReader* _cur_line_reader;
     Decompressor* _cur_decompressor;
     bool _cur_line_reader_eof;
@@ -117,8 +117,6 @@ protected:
     // When we fetch range doesn't start from 0 will always skip the first line
     int _skip_lines;
 
-    // used to hold current StreamLoadPipe
-    std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
     std::vector<Slice> _split_values;
 };
 
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 0be3d4c089..5e7b8ba309 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -22,14 +22,12 @@
 #include <algorithm>
 
 #include "env/env.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_text_line_reader.h"
-#include "exec/s3_reader.h"
 #include "exprs/expr.h"
 #include "exprs/json_functions.h"
 #include "gutil/strings/split.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 
@@ -67,7 +65,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
     SCOPED_TIMER(_read_timer);
     // Get one line
     while (!_scanner_eof) {
-        if (_cur_file_reader == nullptr || _cur_reader_eof) {
+        if (!_cur_file_reader || _cur_reader_eof) {
             RETURN_IF_ERROR(open_next_reader());
             // If there isn't any more reader, break this
             if (_scanner_eof) {
@@ -124,16 +122,6 @@ Status JsonScanner::open_based_reader() {
 }
 
 Status JsonScanner::open_file_reader() {
-    if (_cur_file_reader != nullptr) {
-        if (_stream_load_pipe != nullptr) {
-            _stream_load_pipe.reset();
-            _cur_file_reader = nullptr;
-        } else {
-            delete _cur_file_reader;
-            _cur_file_reader = nullptr;
-        }
-    }
-
     const TBrokerRangeDesc& range = _ranges[_next_range];
     int64_t start_offset = range.start_offset;
     if (start_offset != 0) {
@@ -142,45 +130,12 @@ Status JsonScanner::open_file_reader() {
     if (range.__isset.read_json_by_line) {
         _read_json_by_line = range.read_json_by_line;
     }
-    switch (range.file_type) {
-    case TFileType::FILE_LOCAL: {
-        LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset);
-        RETURN_IF_ERROR(file_reader->open());
-        _cur_file_reader = file_reader;
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        BrokerReader* broker_reader =
-                new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
-                                 range.path, start_offset);
-        RETURN_IF_ERROR(broker_reader->open());
-        _cur_file_reader = broker_reader;
-        break;
-    }
-    case TFileType::FILE_S3: {
-        BufferedReader* s3_reader = new BufferedReader(
-                _profile, new S3Reader(_params.properties, range.path, start_offset));
-        RETURN_IF_ERROR(s3_reader->open());
-        _cur_file_reader = s3_reader;
-        break;
-    }
-    case TFileType::FILE_STREAM: {
-        _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id);
-        if (_stream_load_pipe == nullptr) {
-            VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id);
-            return Status::InternalError("unknown stream load id");
-        }
-        _cur_file_reader = _stream_load_pipe.get();
-        break;
-    }
-    default: {
-        std::stringstream ss;
-        ss << "Unknown file type, type=" << range.file_type;
-        return Status::InternalError(ss.str());
-    }
-    }
+
+    RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile,
+                                                    _broker_addresses, _params.properties, range,
+                                                    start_offset, _cur_file_reader));
     _cur_reader_eof = false;
-    return Status::OK();
+    return _cur_file_reader->open();
 }
 
 Status JsonScanner::open_line_reader() {
@@ -197,7 +152,7 @@ Status JsonScanner::open_line_reader() {
     } else {
         _skip_next_line = false;
     }
-    _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, nullptr, size,
+    _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
                                                _line_delimiter, _line_delimiter_length);
     _cur_reader_eof = false;
     return Status::OK();
@@ -224,7 +179,7 @@ Status JsonScanner::open_json_reader() {
     } else {
         _cur_json_reader =
                 new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
-                               fuzzy_parse, &_scanner_eof, _cur_file_reader);
+                               fuzzy_parse, &_scanner_eof, _cur_file_reader.get());
     }
 
     RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
@@ -264,14 +219,6 @@ void JsonScanner::close() {
         delete _cur_line_reader;
         _cur_line_reader = nullptr;
     }
-    if (_cur_file_reader != nullptr) {
-        if (_stream_load_pipe != nullptr) {
-            _stream_load_pipe.reset();
-        } else {
-            delete _cur_file_reader;
-        }
-        _cur_file_reader = nullptr;
-    }
 }
 
 ////// class JsonDataInternal
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index 5dc61c18cb..981e242d96 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -89,7 +89,7 @@ protected:
     int _line_delimiter_length;
 
     // Reader
-    FileReader* _cur_file_reader;
+    std::shared_ptr<FileReader> _cur_file_reader;
     LineReader* _cur_line_reader;
     JsonReader* _cur_json_reader;
     bool _cur_reader_eof;
@@ -98,9 +98,6 @@ protected:
     // When we fetch range doesn't start from 0,
     // we will read to one ahead, and skip the first line
     bool _skip_next_line;
-
-    // used to hold current StreamLoadPipe
-    std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
 };
 
 class JsonDataInternal {
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 138eb729ef..c20aef7d05 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -17,21 +17,14 @@
 
 #include "exec/orc_scanner.h"
 
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
-#include "exec/local_file_reader.h"
-#include "exec/s3_reader.h"
-#include "exprs/expr.h"
-#include "runtime/descriptors.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
+#include "io/local_file_reader.h"
 #include "runtime/exec_env.h"
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple.h"
 
-#if defined(__x86_64__)
-#include "exec/hdfs_file_reader.h"
-#endif
-
 // orc include file didn't expose orc::TimezoneError
 // we have to declare it by hand, following is the source code in orc link
 // https://github.com/apache/orc/blob/84353fbfc447b06e0924024a8e03c1aaebd3e7a5/c%2B%2B/src/Timezone.hh#L104-L109
@@ -387,44 +380,11 @@ Status ORCScanner::open_next_reader() {
         }
         const TBrokerRangeDesc& range = _ranges[_next_range++];
         std::unique_ptr<FileReader> file_reader;
-        switch (range.file_type) {
-        case TFileType::FILE_LOCAL: {
-            file_reader.reset(new LocalFileReader(range.path, range.start_offset));
-            break;
-        }
-        case TFileType::FILE_BROKER: {
-            int64_t file_size = 0;
-            // for compatibility
-            if (range.__isset.file_size) {
-                file_size = range.file_size;
-            }
-            file_reader.reset(new BufferedReader(
-                    _profile,
-                    new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
-                                     range.path, range.start_offset, file_size)));
-            break;
-        }
-        case TFileType::FILE_S3: {
-            file_reader.reset(new BufferedReader(
-                    _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
-            break;
-        }
-        case TFileType::FILE_HDFS: {
-#if defined(__x86_64__)
-            file_reader.reset(
-                    new HdfsFileReader(range.hdfs_params, range.path, range.start_offset));
-            break;
-#else
-            return Status::InternalError("HdfsFileReader do not support on non x86 platform");
-#endif
-        }
-        default: {
-            std::stringstream ss;
-            ss << "Unknown file type, type=" << range.file_type;
-            return Status::InternalError(ss.str());
-        }
-        }
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                range.file_type, _state->exec_env(), _profile, _broker_addresses,
+                _params.properties, range, range.start_offset, file_reader));
         RETURN_IF_ERROR(file_reader->open());
+
         if (file_reader->size() == 0) {
             file_reader->close();
             continue;
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 880313b6f0..a47e965de5 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -18,13 +18,13 @@
 #include "exec/parquet_scanner.h"
 
 #include "exec/arrow/parquet_reader.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
 #include "exec/decompressor.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_reader.h"
-#include "exec/s3_reader.h"
 #include "exec/text_converter.h"
+#include "exec/text_converter.hpp"
+#include "exprs/expr.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
+#include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/raw_value.h"
 #include "runtime/stream_load/load_stream_mgr.h"
@@ -101,42 +101,11 @@ Status ParquetScanner::open_next_reader() {
         }
         const TBrokerRangeDesc& range = _ranges[_next_range++];
         std::unique_ptr<FileReader> file_reader;
-        switch (range.file_type) {
-        case TFileType::FILE_LOCAL: {
-            file_reader.reset(new LocalFileReader(range.path, range.start_offset));
-            break;
-        }
-        case TFileType::FILE_HDFS: {
-            FileReader* reader;
-            RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
-                                                            range.start_offset, &reader));
-            file_reader.reset(reader);
-            break;
-        }
-        case TFileType::FILE_BROKER: {
-            int64_t file_size = 0;
-            // for compatibility
-            if (range.__isset.file_size) {
-                file_size = range.file_size;
-            }
-            file_reader.reset(new BufferedReader(
-                    _profile,
-                    new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
-                                     range.path, range.start_offset, file_size)));
-            break;
-        }
-        case TFileType::FILE_S3: {
-            file_reader.reset(new BufferedReader(
-                    _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
-            break;
-        }
-        default: {
-            std::stringstream ss;
-            ss << "Unknown file type, type=" << range.file_type;
-            return Status::InternalError(ss.str());
-        }
-        }
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                range.file_type, _state->exec_env(), _profile, _broker_addresses,
+                _params.properties, range, range.start_offset, file_reader));
         RETURN_IF_ERROR(file_reader->open());
+
         if (file_reader->size() == 0) {
             file_reader->close();
             continue;
diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp
index befdb1cc54..5f1da82469 100644
--- a/be/src/exec/parquet_writer.cpp
+++ b/be/src/exec/parquet_writer.cpp
@@ -21,8 +21,17 @@
 #include <arrow/status.h>
 #include <time.h>
 
-#include "exec/file_writer.h"
+#include "common/logging.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/TPaloBrokerService.h"
+#include "io/file_writer.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_pool.h"
 #include "util/mysql_global.h"
+#include "util/thrift_util.h"
 #include "util/types.h"
 
 namespace doris {
diff --git a/be/src/exec/plain_binary_line_reader.cpp b/be/src/exec/plain_binary_line_reader.cpp
index 9cf1ff473f..f63671c622 100644
--- a/be/src/exec/plain_binary_line_reader.cpp
+++ b/be/src/exec/plain_binary_line_reader.cpp
@@ -18,7 +18,7 @@
 #include "exec/plain_binary_line_reader.h"
 
 #include "common/status.h"
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
 
 namespace doris {
 
diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp
index 8932e1c492..825982c5b1 100644
--- a/be/src/exec/plain_text_line_reader.cpp
+++ b/be/src/exec/plain_text_line_reader.cpp
@@ -19,7 +19,7 @@
 
 #include "common/status.h"
 #include "exec/decompressor.h"
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
 
 // INPUT_CHUNK must
 //  larger than 15B for correct lz4 file decompressing
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
new file mode 100644
index 0000000000..1be0e05176
--- /dev/null
+++ b/be/src/io/CMakeLists.txt
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/io")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/io")
+
+set(EXEC_FILES
+     buffered_reader.cpp
+     local_file_reader.cpp
+     local_file_writer.cpp
+     broker_reader.cpp
+     broker_writer.cpp
+     s3_reader.cpp
+     s3_writer.cpp
+     hdfs_reader_writer.cpp
+     file_factory.cpp
+)
+
+if (ARCH_AMD64)
+    set(EXEC_FILES
+        ${EXEC_FILES}
+        hdfs_file_reader.cpp
+        hdfs_writer.cpp
+        )
+endif()
+
+add_library(IO STATIC
+    ${EXEC_FILES}
+)
diff --git a/be/src/exec/broker_reader.cpp b/be/src/io/broker_reader.cpp
similarity index 99%
rename from be/src/exec/broker_reader.cpp
rename to be/src/io/broker_reader.cpp
index 0d65bfc6f5..1745a7bbc2 100644
--- a/be/src/exec/broker_reader.cpp
+++ b/be/src/io/broker_reader.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/broker_reader.h"
+#include "broker_reader.h"
 
 #include <sstream>
 
diff --git a/be/src/exec/broker_reader.h b/be/src/io/broker_reader.h
similarity index 98%
rename from be/src/exec/broker_reader.h
rename to be/src/io/broker_reader.h
index 05a4b24c79..94f86ca8df 100644
--- a/be/src/exec/broker_reader.h
+++ b/be/src/io/broker_reader.h
@@ -23,7 +23,7 @@
 #include <string>
 
 #include "common/status.h"
-#include "exec/file_reader.h"
+#include "file_reader.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/Types_types.h"
 
diff --git a/be/src/exec/broker_writer.cpp b/be/src/io/broker_writer.cpp
similarity index 99%
rename from be/src/exec/broker_writer.cpp
rename to be/src/io/broker_writer.cpp
index 58f9dd4931..3975abe664 100644
--- a/be/src/exec/broker_writer.cpp
+++ b/be/src/io/broker_writer.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/broker_writer.h"
+#include "broker_writer.h"
 
 #include <sstream>
 
diff --git a/be/src/exec/broker_writer.h b/be/src/io/broker_writer.h
similarity index 98%
rename from be/src/exec/broker_writer.h
rename to be/src/io/broker_writer.h
index 9bb8c4c5a5..c4b8bac17c 100644
--- a/be/src/exec/broker_writer.h
+++ b/be/src/io/broker_writer.h
@@ -23,7 +23,7 @@
 #include <string>
 
 #include "common/status.h"
-#include "exec/file_writer.h"
+#include "file_writer.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/Types_types.h"
 
diff --git a/be/src/exec/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
similarity index 99%
rename from be/src/exec/buffered_reader.cpp
rename to be/src/io/buffered_reader.cpp
index 5c599d41b7..5a80c4f842 100644
--- a/be/src/exec/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/buffered_reader.h"
+#include "buffered_reader.h"
 
 #include <algorithm>
 #include <sstream>
diff --git a/be/src/exec/buffered_reader.h b/be/src/io/buffered_reader.h
similarity index 98%
rename from be/src/exec/buffered_reader.h
rename to be/src/io/buffered_reader.h
index 937e154ca7..8ffd5cd0ab 100644
--- a/be/src/exec/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -22,7 +22,7 @@
 #include <memory>
 
 #include "common/status.h"
-#include "exec/file_reader.h"
+#include "file_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
 
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
new file mode 100644
index 0000000000..f8c86a9146
--- /dev/null
+++ b/be/src/io/file_factory.cpp
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "file_factory.h"
+
+#include "broker_reader.h"
+#include "broker_writer.h"
+#include "buffered_reader.h"
+#include "hdfs_reader_writer.h"
+#include "local_file_reader.h"
+#include "local_file_writer.h"
+#include "runtime/exec_env.h"
+#include "runtime/stream_load/load_stream_mgr.h"
+#include "s3_reader.h"
+#include "s3_writer.h"
+
+doris::Status doris::FileFactory::create_file_writer(
+        TFileType::type type, doris::ExecEnv* env,
+        const std::vector<TNetworkAddress>& broker_addresses,
+        const std::map<std::string, std::string>& properties, const std::string& path,
+        int64_t start_offset, std::unique_ptr<FileWriter>& file_writer) {
+    switch (type) {
+    case TFileType::FILE_LOCAL: {
+        file_writer.reset(new LocalFileWriter(path, start_offset));
+        break;
+    }
+    case TFileType::FILE_BROKER: {
+        file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset));
+        break;
+    }
+    case TFileType::FILE_S3: {
+        file_writer.reset(new S3Writer(properties, path, start_offset));
+        break;
+    }
+    case TFileType::FILE_HDFS: {
+        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+                const_cast<std::map<std::string, std::string>&>(properties), path, file_writer));
+        break;
+    }
+    default:
+        return Status::InternalError("UnSupport File Writer Type: " + std::to_string(type));
+    }
+
+    return Status::OK();
+}
+
+doris::Status doris::FileFactory::_new_file_reader(
+        doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile,
+        const std::vector<TNetworkAddress>& broker_addresses,
+        const std::map<std::string, std::string>& properties, const TBrokerRangeDesc& range,
+        int64_t start_offset, FileReader*& file_reader) {
+    switch (type) {
+    case TFileType::FILE_LOCAL: {
+        file_reader = new LocalFileReader(range.path, start_offset);
+        break;
+    }
+    case TFileType::FILE_BROKER: {
+        file_reader = new BufferedReader(
+                profile,
+                new BrokerReader(env, broker_addresses, properties, range.path, start_offset,
+                                 range.__isset.file_size ? range.file_size : 0));
+        break;
+    }
+    case TFileType::FILE_S3: {
+        file_reader =
+                new BufferedReader(profile, new S3Reader(properties, range.path, start_offset));
+        break;
+    }
+    case TFileType::FILE_HDFS: {
+        FileReader* hdfs_reader = nullptr;
+        RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset,
+                                                        &hdfs_reader));
+        file_reader = new BufferedReader(profile, hdfs_reader);
+        break;
+    }
+    default:
+        return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type));
+    }
+
+    return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(
+        doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile,
+        const std::vector<TNetworkAddress>& broker_addresses,
+        const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range,
+        int64_t start_offset, std::unique_ptr<FileReader>& file_reader) {
+    if (type == TFileType::FILE_STREAM) {
+        return Status::InternalError("UnSupport UniquePtr For FileStream type");
+    }
+
+    FileReader* file_reader_ptr;
+    RETURN_IF_ERROR(_new_file_reader(type, env, profile, broker_addresses, properties, range,
+                                     start_offset, file_reader_ptr));
+    file_reader.reset(file_reader_ptr);
+
+    return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(
+        doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile,
+        const std::vector<TNetworkAddress>& broker_addresses,
+        const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range,
+        int64_t start_offset, std::shared_ptr<FileReader>& file_reader) {
+    if (type == TFileType::FILE_STREAM) {
+        file_reader = env->load_stream_mgr()->get(range.load_id);
+        if (!file_reader) {
+            VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id);
+            return Status::InternalError("unknown stream load id");
+        }
+    } else {
+        FileReader* file_reader_ptr;
+        RETURN_IF_ERROR(_new_file_reader(type, env, profile, broker_addresses, properties, range,
+                                         start_offset, file_reader_ptr));
+        file_reader.reset(file_reader_ptr);
+    }
+    return Status::OK();
+}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
new file mode 100644
index 0000000000..36c2871599
--- /dev/null
+++ b/be/src/io/file_factory.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "file_reader.h"
+#include "file_writer.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+
+namespace doris {
+class ExecEnv;
+class TNetworkAddress;
+class RuntimeProfile;
+
+class FileFactory {
+public:
+    static Status create_file_writer(TFileType::type type, ExecEnv* env,
+                                     const std::vector<TNetworkAddress>& broker_addresses,
+                                     const std::map<std::string, std::string>& properties,
+                                     const std::string& path, int64_t start_offset,
+                                     std::unique_ptr<FileWriter>& file_writer);
+
+    // Because StreamLoadPipe use std::shared_ptr, here we have to support both unique_ptr
+    // and shared_ptr create_file_reader
+    static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
+                                     const std::vector<TNetworkAddress>& broker_addresses,
+                                     const std::map<std::string, std::string>& properties,
+                                     const TBrokerRangeDesc& range, int64_t start_offset,
+                                     std::unique_ptr<FileReader>& file_reader);
+
+    static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
+                                     const std::vector<TNetworkAddress>& broker_addresses,
+                                     const std::map<std::string, std::string>& properties,
+                                     const TBrokerRangeDesc& range, int64_t start_offset,
+                                     std::shared_ptr<FileReader>& file_reader);
+
+    static TFileType::type convert_storage_type(TStorageBackendType::type type) {
+        switch (type) {
+        case TStorageBackendType::LOCAL:
+            return TFileType::FILE_LOCAL;
+        case TStorageBackendType::S3:
+            return TFileType::FILE_S3;
+        case TStorageBackendType::BROKER:
+            return TFileType::FILE_BROKER;
+        case TStorageBackendType::HDFS:
+            return TFileType::FILE_HDFS;
+        default:
+            LOG(FATAL) << "not match type to convert, from type:" << type;
+        }
+        __builtin_unreachable();
+    }
+
+private:
+    // Note: if the function return Status::OK() means new the file_reader. the caller
+    // should delete the memory of file_reader or use the smart_ptr to hold the own of file_reader
+    static Status _new_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
+                                   const std::vector<TNetworkAddress>& broker_addresses,
+                                   const std::map<std::string, std::string>& properties,
+                                   const TBrokerRangeDesc& range, int64_t start_offset,
+                                   FileReader*& file_reader);
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/file_reader.h b/be/src/io/file_reader.h
similarity index 100%
rename from be/src/exec/file_reader.h
rename to be/src/io/file_reader.h
diff --git a/be/src/exec/file_writer.h b/be/src/io/file_writer.h
similarity index 100%
rename from be/src/exec/file_writer.h
rename to be/src/io/file_writer.h
diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
similarity index 99%
rename from be/src/exec/hdfs_file_reader.cpp
rename to be/src/io/hdfs_file_reader.cpp
index 25e3a09638..5d4dc2cc5e 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "exec/hdfs_file_reader.h"
+#include "hdfs_file_reader.h"
 
 #include <sys/stat.h>
 #include <unistd.h>
diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h
similarity index 98%
rename from be/src/exec/hdfs_file_reader.h
rename to be/src/io/hdfs_file_reader.h
index d4430de3e2..83c8efb7dc 100644
--- a/be/src/exec/hdfs_file_reader.h
+++ b/be/src/io/hdfs_file_reader.h
@@ -19,7 +19,7 @@
 
 #include <hdfs/hdfs.h>
 
-#include "exec/file_reader.h"
+#include "file_reader.h"
 #include "gen_cpp/PlanNodes_types.h"
 
 namespace doris {
diff --git a/be/src/exec/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp
similarity index 84%
rename from be/src/exec/hdfs_reader_writer.cpp
rename to be/src/io/hdfs_reader_writer.cpp
index 956fc9e40e..9a072512c0 100644
--- a/be/src/exec/hdfs_reader_writer.cpp
+++ b/be/src/io/hdfs_reader_writer.cpp
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/hdfs_reader_writer.h"
+#include "hdfs_reader_writer.h"
 
 #if defined(__x86_64__)
-#include "exec/hdfs_file_reader.h"
-#include "exec/hdfs_writer.h"
+#include "hdfs_file_reader.h"
+#include "hdfs_writer.h"
 #endif
 
 namespace doris {
@@ -35,9 +35,10 @@ Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const std
 }
 
 Status HdfsReaderWriter::create_writer(std::map<std::string, std::string>& properties,
-                                       const std::string& path, FileWriter** writer) {
+                                       const std::string& path,
+                                       std::unique_ptr<FileWriter>& writer) {
 #if defined(__x86_64__)
-    *writer = new HDFSWriter(properties, path);
+    writer.reset(new HDFSWriter(properties, path));
     return Status::OK();
 #else
     return Status::InternalError("HdfsWriter do not support on non x86 platform");
diff --git a/be/src/exec/hdfs_reader_writer.h b/be/src/io/hdfs_reader_writer.h
similarity index 91%
rename from be/src/exec/hdfs_reader_writer.h
rename to be/src/io/hdfs_reader_writer.h
index e0decf2035..160306ffce 100644
--- a/be/src/exec/hdfs_reader_writer.h
+++ b/be/src/io/hdfs_reader_writer.h
@@ -17,8 +17,8 @@
 
 #pragma once
 
-#include "exec/file_reader.h"
-#include "exec/file_writer.h"
+#include "file_reader.h"
+#include "file_writer.h"
 #include "gen_cpp/PlanNodes_types.h"
 
 namespace doris {
@@ -35,7 +35,7 @@ public:
                                 int64_t start_offset, FileReader** reader);
 
     static Status create_writer(std::map<std::string, std::string>& properties,
-                                const std::string& path, FileWriter** writer);
+                                const std::string& path, std::unique_ptr<FileWriter>& writer);
 };
 
 } // namespace doris
diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp
similarity index 99%
rename from be/src/exec/hdfs_writer.cpp
rename to be/src/io/hdfs_writer.cpp
index a362488d8b..16b2516ca8 100644
--- a/be/src/exec/hdfs_writer.cpp
+++ b/be/src/io/hdfs_writer.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/hdfs_writer.h"
+#include "hdfs_writer.h"
 
 #include <filesystem>
 
diff --git a/be/src/exec/hdfs_writer.h b/be/src/io/hdfs_writer.h
similarity index 98%
rename from be/src/exec/hdfs_writer.h
rename to be/src/io/hdfs_writer.h
index 8bc9060c2a..15d9a1fde8 100644
--- a/be/src/exec/hdfs_writer.h
+++ b/be/src/io/hdfs_writer.h
@@ -22,7 +22,7 @@
 #include <map>
 #include <string>
 
-#include "exec/file_writer.h"
+#include "file_writer.h"
 
 namespace doris {
 class HDFSWriter : public FileWriter {
diff --git a/be/src/exec/local_file_reader.cpp b/be/src/io/local_file_reader.cpp
similarity index 99%
rename from be/src/exec/local_file_reader.cpp
rename to be/src/io/local_file_reader.cpp
index d5c8454532..e8e2d38ea2 100644
--- a/be/src/exec/local_file_reader.cpp
+++ b/be/src/io/local_file_reader.cpp
@@ -14,7 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "exec/local_file_reader.h"
+#include "local_file_reader.h"
 
 #include <sys/stat.h>
 #include <unistd.h>
diff --git a/be/src/exec/local_file_reader.h b/be/src/io/local_file_reader.h
similarity index 98%
rename from be/src/exec/local_file_reader.h
rename to be/src/io/local_file_reader.h
index 3224f94562..c525804395 100644
--- a/be/src/exec/local_file_reader.h
+++ b/be/src/io/local_file_reader.h
@@ -20,7 +20,7 @@
 #define _FILE_OFFSET_BITS 64
 #include <stdio.h>
 
-#include "exec/file_reader.h"
+#include "file_reader.h"
 
 namespace doris {
 
diff --git a/be/src/exec/local_file_writer.cpp b/be/src/io/local_file_writer.cpp
similarity index 75%
rename from be/src/exec/local_file_writer.cpp
rename to be/src/io/local_file_writer.cpp
index 056d4b0cea..3c425a46aa 100644
--- a/be/src/exec/local_file_writer.cpp
+++ b/be/src/io/local_file_writer.cpp
@@ -15,9 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/local_file_writer.h"
+#include "local_file_writer.h"
 
+#include "service/backend_options.h"
 #include "util/error_util.h"
+#include "util/file_utils.h"
 
 namespace doris {
 
@@ -29,6 +31,8 @@ LocalFileWriter::~LocalFileWriter() {
 }
 
 Status LocalFileWriter::open() {
+    RETURN_IF_ERROR(_check_file_path(_path));
+
     _fp = fopen(_path.c_str(), "w+");
     if (_fp == nullptr) {
         std::stringstream ss;
@@ -72,4 +76,18 @@ Status LocalFileWriter::close() {
     return Status::OK();
 }
 
+Status LocalFileWriter::_check_file_path(const std::string& file_path) {
+    // For local file writer, the file_path is a local dir.
+    // Here we do a simple security verification by checking whether the file exists.
+    // Because the file path is currently arbitrarily specified by the user,
+    // Doris is not responsible for ensuring the correctness of the path.
+    // This is just to prevent overwriting the existing file.
+    if (FileUtils::check_exist(file_path)) {
+        return Status::InternalError("File already exists: " + file_path +
+                                     ". Host: " + BackendOptions::get_localhost());
+    }
+
+    return Status::OK();
+}
+
 } // end namespace doris
diff --git a/be/src/exec/local_file_writer.h b/be/src/io/local_file_writer.h
similarity index 90%
rename from be/src/exec/local_file_writer.h
rename to be/src/io/local_file_writer.h
index 63be5e9d79..7d9da485c3 100644
--- a/be/src/exec/local_file_writer.h
+++ b/be/src/io/local_file_writer.h
@@ -19,7 +19,7 @@
 
 #include <stdio.h>
 
-#include "exec/file_writer.h"
+#include "file_writer.h"
 
 namespace doris {
 
@@ -28,7 +28,8 @@ class RuntimeState;
 class LocalFileWriter : public FileWriter {
 public:
     LocalFileWriter(const std::string& path, int64_t start_offset);
-    virtual ~LocalFileWriter();
+
+    ~LocalFileWriter() override;
 
     Status open() override;
 
@@ -37,6 +38,8 @@ public:
     virtual Status close() override;
 
 private:
+    static Status _check_file_path(const std::string& file_path);
+
     std::string _path;
     int64_t _start_offset;
     FILE* _fp;
diff --git a/be/src/exec/s3_reader.cpp b/be/src/io/s3_reader.cpp
similarity index 99%
rename from be/src/exec/s3_reader.cpp
rename to be/src/io/s3_reader.cpp
index 30e6daaa59..c932e2d886 100644
--- a/be/src/exec/s3_reader.cpp
+++ b/be/src/io/s3_reader.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/s3_reader.h"
+#include "s3_reader.h"
 
 #include <aws/s3/S3Client.h>
 #include <aws/s3/model/GetObjectRequest.h>
diff --git a/be/src/exec/s3_reader.h b/be/src/io/s3_reader.h
similarity index 98%
rename from be/src/exec/s3_reader.h
rename to be/src/io/s3_reader.h
index 0de0b0944e..a1464324df 100644
--- a/be/src/exec/s3_reader.h
+++ b/be/src/io/s3_reader.h
@@ -20,7 +20,7 @@
 #include <map>
 #include <string>
 
-#include "exec/file_reader.h"
+#include "file_reader.h"
 #include "util/s3_uri.h"
 
 namespace Aws {
diff --git a/be/src/exec/s3_writer.cpp b/be/src/io/s3_writer.cpp
similarity index 99%
rename from be/src/exec/s3_writer.cpp
rename to be/src/io/s3_writer.cpp
index 8b44c621d5..df37e7d820 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/io/s3_writer.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/s3_writer.h"
+#include "s3_writer.h"
 
 #include <aws/core/utils/FileSystemUtils.h>
 #include <aws/s3/S3Client.h>
diff --git a/be/src/exec/s3_writer.h b/be/src/io/s3_writer.h
similarity index 98%
rename from be/src/exec/s3_writer.h
rename to be/src/io/s3_writer.h
index 09084ac67b..ae2756da08 100644
--- a/be/src/exec/s3_writer.h
+++ b/be/src/io/s3_writer.h
@@ -20,7 +20,7 @@
 #include <map>
 #include <string>
 
-#include "exec/file_writer.h"
+#include "file_writer.h"
 #include "util/s3_uri.h"
 
 namespace Aws {
diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp
index 8b60d0f5f9..9f72c365de 100644
--- a/be/src/runtime/export_sink.cpp
+++ b/be/src/runtime/export_sink.cpp
@@ -21,13 +21,10 @@
 
 #include <sstream>
 
-#include "exec/broker_writer.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_writer.h"
-#include "exec/s3_writer.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
 #include "gutil/strings/numbers.h"
+#include "io/file_factory.h"
 #include "runtime/large_int_value.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
@@ -247,50 +244,14 @@ Status ExportSink::open_file_writer() {
     }
 
     std::string file_name = gen_file_name();
-
     // TODO(lingbin): gen file path
-    switch (_t_export_sink.file_type) {
-    case TFileType::FILE_LOCAL: {
-        LocalFileWriter* file_writer =
-                new LocalFileWriter(_t_export_sink.export_path + "/" + file_name, 0);
-        RETURN_IF_ERROR(file_writer->open());
-        _file_writer.reset(file_writer);
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        BrokerWriter* broker_writer = new BrokerWriter(
-                _state->exec_env(), _t_export_sink.broker_addresses, _t_export_sink.properties,
-                _t_export_sink.export_path + "/" + file_name, 0 /* offset */);
-        RETURN_IF_ERROR(broker_writer->open());
-        _file_writer.reset(broker_writer);
-        break;
-    }
-    case TFileType::FILE_S3: {
-        S3Writer* s3_writer =
-                new S3Writer(_t_export_sink.properties,
-                             _t_export_sink.export_path + "/" + file_name, 0 /* offset */);
-        RETURN_IF_ERROR(s3_writer->open());
-        _file_writer.reset(s3_writer);
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        FileWriter* hdfs_writer;
-        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
-                const_cast<std::map<std::string, std::string>&>(_t_export_sink.properties),
-                _t_export_sink.export_path + "/" + file_name, &hdfs_writer));
-        RETURN_IF_ERROR(hdfs_writer->open());
-        _file_writer.reset(hdfs_writer);
-        break;
-    }
-    default: {
-        std::stringstream ss;
-        ss << "Unknown file type, type=" << _t_export_sink.file_type;
-        return Status::InternalError(ss.str());
-    }
-    }
-
+    RETURN_IF_ERROR(FileFactory::create_file_writer(
+            _t_export_sink.file_type, _state->exec_env(), _t_export_sink.broker_addresses,
+            _t_export_sink.properties, _t_export_sink.export_path + "/" + file_name, 0,
+            _file_writer));
     _state->add_export_output_file(_t_export_sink.export_path + "/" + file_name);
-    return Status::OK();
+
+    return _file_writer->open();
 }
 
 // TODO(lingbin): add some other info to file name, like partition
diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp
index f2351476a3..8f4e0c0d03 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -18,15 +18,12 @@
 #include "runtime/file_result_writer.h"
 
 #include "common/consts.h"
-#include "exec/broker_writer.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_writer.h"
 #include "exec/parquet_writer.h"
-#include "exec/s3_writer.h"
 #include "exprs/expr_context.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gutil/strings/numbers.h"
 #include "gutil/strings/substitute.h"
+#include "io/file_factory.h"
 #include "runtime/buffer_control_block.h"
 #include "runtime/large_int_value.h"
 #include "runtime/primitive_type.h"
@@ -95,15 +92,6 @@ Status FileResultWriter::_get_success_file_name(std::string* file_name) {
     ss << _file_opts->file_path << _file_opts->success_file_name;
     *file_name = ss.str();
     if (_storage_type == TStorageBackendType::LOCAL) {
-        // For local file writer, the file_path is a local dir.
-        // Here we do a simple security verification by checking whether the file exists.
-        // Because the file path is currently arbitrarily specified by the user,
-        // Doris is not responsible for ensuring the correctness of the path.
-        // This is just to prevent overwriting the existing file.
-        if (FileUtils::check_exist(*file_name)) {
-            return Status::InternalError("File already exists: " + *file_name +
-                                         ". Host: " + BackendOptions::get_localhost());
-        }
     }
 
     return Status::OK();
@@ -116,26 +104,18 @@ Status FileResultWriter::_create_next_file_writer() {
 }
 
 Status FileResultWriter::_create_file_writer(const std::string& file_name) {
-    if (_storage_type == TStorageBackendType::LOCAL) {
-        _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
-    } else if (_storage_type == TStorageBackendType::BROKER) {
-        _file_writer =
-                new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses,
-                                 _file_opts->broker_properties, file_name, 0 /*start offset*/);
-    } else if (_storage_type == TStorageBackendType::S3) {
-        _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */);
-    } else if (_storage_type == TStorageBackendType::HDFS) {
-        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
-                const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties),
-                file_name, &_file_writer));
-    }
+    RETURN_IF_ERROR(FileFactory::create_file_writer(
+            FileFactory::convert_storage_type(_storage_type), _state->exec_env(),
+            _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0,
+            _file_writer));
     RETURN_IF_ERROR(_file_writer->open());
+
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
         // just use file writer is enough
         break;
     case TFileFormatType::FORMAT_PARQUET:
-        _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs,
+        _parquet_writer = new ParquetWriterWrapper(_file_writer.get(), _output_expr_ctxs,
                                                    _file_opts->file_properties, _file_opts->schema,
                                                    _output_object_data);
         break;
@@ -417,12 +397,8 @@ Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
         COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
         delete _parquet_writer;
         _parquet_writer = nullptr;
-        delete _file_writer;
-        _file_writer = nullptr;
-    } else if (_file_writer != nullptr) {
+    } else if (_file_writer) {
         _file_writer->close();
-        delete _file_writer;
-        _file_writer = nullptr;
     }
 
     if (only_close) {
diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h
index b8d9f71d77..03f53bd165 100644
--- a/be/src/runtime/file_result_writer.h
+++ b/be/src/runtime/file_result_writer.h
@@ -132,7 +132,7 @@ private:
 
     // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
     // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
-    FileWriter* _file_writer = nullptr;
+    std::unique_ptr<FileWriter> _file_writer;
     // parquet file writer
     ParquetWriterWrapper* _parquet_writer = nullptr;
     // Used to buffer the export data of plain text
diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h
index 6e7fc43eec..6d01bbe091 100644
--- a/be/src/runtime/routine_load/kafka_consumer_pipe.h
+++ b/be/src/runtime/routine_load/kafka_consumer_pipe.h
@@ -23,7 +23,7 @@
 #include <string>
 #include <vector>
 
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
 #include "librdkafka/rdkafka.h"
 #include "runtime/message_body_sink.h"
 #include "runtime/stream_load/stream_load_pipe.h"
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index d793d1cbd6..716a145f61 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -21,8 +21,8 @@
 #include <deque>
 #include <mutex>
 
-#include "exec/file_reader.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "io/file_reader.h"
 #include "runtime/message_body_sink.h"
 #include "util/bit_util.h"
 #include "util/byte_buffer.h"
diff --git a/be/src/util/broker_load_error_hub.cpp b/be/src/util/broker_load_error_hub.cpp
index 84b75d51f9..a8c0441b27 100644
--- a/be/src/util/broker_load_error_hub.cpp
+++ b/be/src/util/broker_load_error_hub.cpp
@@ -17,7 +17,7 @@
 
 #include "util/broker_load_error_hub.h"
 
-#include "exec/broker_writer.h"
+#include "io/broker_writer.h"
 #include "util/defer_op.h"
 
 namespace doris {
diff --git a/be/src/util/broker_storage_backend.cpp b/be/src/util/broker_storage_backend.cpp
index c812e98f48..fc653bcb2c 100644
--- a/be/src/util/broker_storage_backend.cpp
+++ b/be/src/util/broker_storage_backend.cpp
@@ -18,13 +18,13 @@
 #include "util/broker_storage_backend.h"
 
 #include "env/env.h"
-#include "exec/broker_reader.h"
-#include "exec/broker_writer.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/FrontendService_types.h"
 #include "gen_cpp/HeartbeatService_types.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/TPaloBrokerService.h"
+#include "io/broker_reader.h"
+#include "io/broker_writer.h"
 #include "olap/file_helper.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index b44475ce19..971f1b45bc 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -16,12 +16,8 @@
 // under the License.
 
 #include "exec/arrow/parquet_reader.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_reader.h"
-#include "exec/s3_reader.h"
 #include "exprs/expr.h"
+#include "io/file_factory.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -61,41 +57,9 @@ Status VArrowScanner::_open_next_reader() {
         }
         const TBrokerRangeDesc& range = _ranges[_next_range++];
         std::unique_ptr<FileReader> file_reader;
-        switch (range.file_type) {
-        case TFileType::FILE_LOCAL: {
-            file_reader.reset(new LocalFileReader(range.path, range.start_offset));
-            break;
-        }
-        case TFileType::FILE_HDFS: {
-            FileReader* reader;
-            RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
-                                                            range.start_offset, &reader));
-            file_reader.reset(reader);
-            break;
-        }
-        case TFileType::FILE_BROKER: {
-            int64_t file_size = 0;
-            // for compatibility
-            if (range.__isset.file_size) {
-                file_size = range.file_size;
-            }
-            file_reader.reset(new BufferedReader(
-                    _profile,
-                    new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
-                                     range.path, range.start_offset, file_size)));
-            break;
-        }
-        case TFileType::FILE_S3: {
-            file_reader.reset(new BufferedReader(
-                    _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
-            break;
-        }
-        default: {
-            std::stringstream ss;
-            ss << "Unknown file type, type=" << range.file_type;
-            return Status::InternalError(ss.str());
-        }
-        }
+        RETURN_IF_ERROR(FileFactory::create_file_reader(
+                range.file_type, _state->exec_env(), _profile, _broker_addresses,
+                _params.properties, range, range.start_offset, file_reader));
         RETURN_IF_ERROR(file_reader->open());
         if (file_reader->size() == 0) {
             file_reader->close();
diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp
index 837b3ec289..1acd304e77 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -105,7 +105,7 @@ Status VJsonScanner::open_vjson_reader() {
     } else {
         _cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array,
                                                 num_as_string, fuzzy_parse, &_scanner_eof,
-                                                _cur_file_reader));
+                                                _cur_file_reader.get()));
     }
 
     RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root));
diff --git a/be/test/exec/broker_reader_test.cpp b/be/test/exec/broker_reader_test.cpp
index c9f19279e4..0923b4edb4 100644
--- a/be/test/exec/broker_reader_test.cpp
+++ b/be/test/exec/broker_reader_test.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/broker_reader.h"
+#include "io/broker_reader.h"
 
 #include <gtest/gtest.h>
 
diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp
index 18128200ea..bc7c91dff3 100644
--- a/be/test/exec/broker_scan_node_test.cpp
+++ b/be/test/exec/broker_scan_node_test.cpp
@@ -24,10 +24,10 @@
 #include <vector>
 
 #include "common/object_pool.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp
index 689e876754..0a65982058 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -24,10 +24,10 @@
 #include <vector>
 
 #include "common/object_pool.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp
index 7fe6c47fee..940635a7f1 100644
--- a/be/test/exec/buffered_reader_test.cpp
+++ b/be/test/exec/buffered_reader_test.cpp
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/buffered_reader.h"
+#include "io/buffered_reader.h"
 
 #include <gtest/gtest.h>
 
-#include "exec/local_file_reader.h"
+#include "io/local_file_reader.h"
 #include "util/stopwatch.hpp"
 
 namespace doris {
diff --git a/be/test/exec/hdfs_file_reader_test.cpp b/be/test/exec/hdfs_file_reader_test.cpp
index 6807272bac..a5e85f2bc8 100644
--- a/be/test/exec/hdfs_file_reader_test.cpp
+++ b/be/test/exec/hdfs_file_reader_test.cpp
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/hdfs_file_reader.h"
+#include "io/hdfs_file_reader.h"
 
 #include <gtest/gtest.h>
 
-#include "exec/hdfs_reader_writer.h"
+#include "io/hdfs_reader_writer.h"
 
 namespace doris {
 
diff --git a/be/test/exec/json_scanner_test.cpp b/be/test/exec/json_scanner_test.cpp
index 582e26b7c5..744746f7b7 100644
--- a/be/test/exec/json_scanner_test.cpp
+++ b/be/test/exec/json_scanner_test.cpp
@@ -24,11 +24,11 @@
 
 #include "common/object_pool.h"
 #include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "exprs/decimalv2_operators.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/row_batch.h"
diff --git a/be/test/exec/json_scanner_with_jsonpath_test.cpp b/be/test/exec/json_scanner_with_jsonpath_test.cpp
index 9db0bd37b4..0394de1035 100644
--- a/be/test/exec/json_scanner_with_jsonpath_test.cpp
+++ b/be/test/exec/json_scanner_with_jsonpath_test.cpp
@@ -24,10 +24,10 @@
 
 #include "common/object_pool.h"
 #include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/row_batch.h"
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
index 3712f6b141..614d9f279f 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -23,10 +23,10 @@
 
 #include "common/object_pool.h"
 #include "exec/broker_scanner.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/exec/orc_scanner_test.cpp b/be/test/exec/orc_scanner_test.cpp
index f5f99924f2..cd1023e692 100644
--- a/be/test/exec/orc_scanner_test.cpp
+++ b/be/test/exec/orc_scanner_test.cpp
@@ -27,11 +27,11 @@
 
 #include "common/object_pool.h"
 #include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "exprs/decimalv2_operators.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp
index 8db72313ba..cbd673d4dc 100644
--- a/be/test/exec/parquet_scanner_test.cpp
+++ b/be/test/exec/parquet_scanner_test.cpp
@@ -24,10 +24,10 @@
 
 #include "common/object_pool.h"
 #include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp
index 747313c803..900f261673 100644
--- a/be/test/exec/plain_text_line_reader_bzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp
@@ -18,8 +18,8 @@
 #include <gtest/gtest.h>
 
 #include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp
index 1e461e8b82..fea15d00c4 100644
--- a/be/test/exec/plain_text_line_reader_gzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp
@@ -18,8 +18,8 @@
 #include <gtest/gtest.h>
 
 #include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
index 0ebd218b28..f6d0844455 100644
--- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
@@ -18,8 +18,8 @@
 #include <gtest/gtest.h>
 
 #include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp
index f8f8b6090d..99aa6336e0 100644
--- a/be/test/exec/plain_text_line_reader_lzop_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp
@@ -18,8 +18,8 @@
 #include <gtest/gtest.h>
 
 #include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
index b9361999b4..815d119ba1 100644
--- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
+++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
@@ -18,8 +18,8 @@
 #include <gtest/gtest.h>
 
 #include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
 #include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
diff --git a/be/test/exec/s3_reader_test.cpp b/be/test/exec/s3_reader_test.cpp
index 3c11a19180..d41a78975a 100644
--- a/be/test/exec/s3_reader_test.cpp
+++ b/be/test/exec/s3_reader_test.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "exec/s3_reader.h"
+#include "io/s3_reader.h"
 
 #include <aws/core/Aws.h>
 #include <gtest/gtest.h>
@@ -28,7 +28,7 @@
 #include <string>
 #include <vector>
 
-#include "exec/s3_writer.h"
+#include "io/s3_writer.h"
 
 namespace doris {
 static const std::string AK = "";
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp
index 0683cf4b7a..719d5014ea 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -23,13 +23,13 @@
 #include <vector>
 
 #include "common/object_pool.h"
-#include "exec/local_file_reader.h"
 #include "exprs/binary_predicate.h"
 #include "exprs/cast_functions.h"
 #include "exprs/literal.h"
 #include "exprs/slot_ref.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/primitive_type.h"
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 428d82343c..27c4643050 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -23,10 +23,10 @@
 #include <vector>
 
 #include "common/object_pool.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
index 393ff80f16..b059faf010 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -26,11 +26,11 @@
 
 #include "common/object_pool.h"
 #include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "exprs/decimalv2_operators.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/row_batch.h"
diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp
index 92a9013786..e6b3b2a96f 100644
--- a/be/test/vec/exec/vorc_scanner_test.cpp
+++ b/be/test/vec/exec/vorc_scanner_test.cpp
@@ -26,12 +26,12 @@
 #include <vector>
 
 #include "common/object_pool.h"
-#include "exec/local_file_reader.h"
 #include "exec/orc_scanner.h"
 #include "exprs/cast_functions.h"
 #include "exprs/decimalv2_operators.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp
index ba8f70ce70..6d3810cc73 100644
--- a/be/test/vec/exec/vparquet_scanner_test.cpp
+++ b/be/test/vec/exec/vparquet_scanner_test.cpp
@@ -23,10 +23,10 @@
 #include <vector>
 
 #include "common/object_pool.h"
-#include "exec/local_file_reader.h"
 #include "exprs/cast_functions.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org