You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/06/08 07:07:45 UTC
[incubator-doris] branch master updated: [Refactor] Use file factory to replace create file reader/writer (#9505)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 94089b9192 [Refactor] Use file factory to replace create file reader/writer (#9505)
94089b9192 is described below
commit 94089b919211403de7e6d8e664715d44ab4fd4f3
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Wed Jun 8 15:07:39 2022 +0800
[Refactor] Use file factory to replace create file reader/writer (#9505)
1. Simplify code logic and improve abstraction
2. Fix the mem leak of raw pointer
Co-authored-by: lihaopeng <li...@baidu.com>
---
be/src/exec/CMakeLists.txt | 22 ++--
be/src/exec/arrow/arrow_reader.cpp | 2 +-
be/src/exec/arrow/orc_reader.cpp | 2 +-
be/src/exec/arrow/parquet_reader.cpp | 6 +-
be/src/exec/broker_scanner.cpp | 94 +++------------
be/src/exec/broker_scanner.h | 4 +-
be/src/exec/json_scanner.cpp | 73 ++----------
be/src/exec/json_scanner.h | 5 +-
be/src/exec/orc_scanner.cpp | 54 ++-------
be/src/exec/parquet_scanner.cpp | 49 ++------
be/src/exec/parquet_writer.cpp | 11 +-
be/src/exec/plain_binary_line_reader.cpp | 2 +-
be/src/exec/plain_text_line_reader.cpp | 2 +-
be/src/io/CMakeLists.txt | 46 +++++++
be/src/{exec => io}/broker_reader.cpp | 2 +-
be/src/{exec => io}/broker_reader.h | 2 +-
be/src/{exec => io}/broker_writer.cpp | 2 +-
be/src/{exec => io}/broker_writer.h | 2 +-
be/src/{exec => io}/buffered_reader.cpp | 2 +-
be/src/{exec => io}/buffered_reader.h | 2 +-
be/src/io/file_factory.cpp | 132 +++++++++++++++++++++
be/src/io/file_factory.h | 77 ++++++++++++
be/src/{exec => io}/file_reader.h | 0
be/src/{exec => io}/file_writer.h | 0
be/src/{exec => io}/hdfs_file_reader.cpp | 2 +-
be/src/{exec => io}/hdfs_file_reader.h | 2 +-
be/src/{exec => io}/hdfs_reader_writer.cpp | 11 +-
be/src/{exec => io}/hdfs_reader_writer.h | 6 +-
be/src/{exec => io}/hdfs_writer.cpp | 2 +-
be/src/{exec => io}/hdfs_writer.h | 2 +-
be/src/{exec => io}/local_file_reader.cpp | 2 +-
be/src/{exec => io}/local_file_reader.h | 2 +-
be/src/{exec => io}/local_file_writer.cpp | 20 +++-
be/src/{exec => io}/local_file_writer.h | 7 +-
be/src/{exec => io}/s3_reader.cpp | 2 +-
be/src/{exec => io}/s3_reader.h | 2 +-
be/src/{exec => io}/s3_writer.cpp | 2 +-
be/src/{exec => io}/s3_writer.h | 2 +-
be/src/runtime/export_sink.cpp | 53 ++-------
be/src/runtime/file_result_writer.cpp | 40 ++-----
be/src/runtime/file_result_writer.h | 2 +-
be/src/runtime/routine_load/kafka_consumer_pipe.h | 2 +-
be/src/runtime/stream_load/stream_load_pipe.h | 2 +-
be/src/util/broker_load_error_hub.cpp | 2 +-
be/src/util/broker_storage_backend.cpp | 4 +-
be/src/vec/exec/varrow_scanner.cpp | 44 +------
be/src/vec/exec/vjson_scanner.cpp | 2 +-
be/test/exec/broker_reader_test.cpp | 2 +-
be/test/exec/broker_scan_node_test.cpp | 2 +-
be/test/exec/broker_scanner_test.cpp | 2 +-
be/test/exec/buffered_reader_test.cpp | 4 +-
be/test/exec/hdfs_file_reader_test.cpp | 4 +-
be/test/exec/json_scanner_test.cpp | 2 +-
be/test/exec/json_scanner_with_jsonpath_test.cpp | 2 +-
be/test/exec/multi_bytes_separator_test.cpp | 2 +-
be/test/exec/orc_scanner_test.cpp | 2 +-
be/test/exec/parquet_scanner_test.cpp | 2 +-
be/test/exec/plain_text_line_reader_bzip_test.cpp | 2 +-
be/test/exec/plain_text_line_reader_gzip_test.cpp | 2 +-
.../exec/plain_text_line_reader_lz4frame_test.cpp | 2 +-
be/test/exec/plain_text_line_reader_lzop_test.cpp | 2 +-
.../plain_text_line_reader_uncompressed_test.cpp | 2 +-
be/test/exec/s3_reader_test.cpp | 4 +-
be/test/vec/exec/vbroker_scan_node_test.cpp | 2 +-
be/test/vec/exec/vbroker_scanner_test.cpp | 2 +-
be/test/vec/exec/vjson_scanner_test.cpp | 2 +-
be/test/vec/exec/vorc_scanner_test.cpp | 2 +-
be/test/vec/exec/vparquet_scanner_test.cpp | 2 +-
68 files changed, 427 insertions(+), 429 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7ce4c4caf3..5708eb0465 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -28,8 +28,7 @@ set(EXEC_FILES
analytic_eval_node.cpp
blocking_join_node.cpp
broker_scan_node.cpp
- broker_reader.cpp
- buffered_reader.cpp
+ ../io/buffered_reader.cpp
base_scanner.cpp
broker_scanner.cpp
cross_join_node.cpp
@@ -42,7 +41,6 @@ set(EXEC_FILES
exchange_node.cpp
hash_join_node.cpp
hash_table.cpp
- local_file_reader.cpp
merge_node.cpp
scan_node.cpp
select_node.cpp
@@ -93,24 +91,28 @@ set(EXEC_FILES
partitioned_hash_table.cc
partitioned_aggregation_node.cc
odbc_scan_node.cpp
- local_file_writer.cpp
- broker_writer.cpp
parquet_scanner.cpp
parquet_writer.cpp
orc_scanner.cpp
odbc_connector.cpp
json_scanner.cpp
assert_num_rows_node.cpp
- s3_reader.cpp
- s3_writer.cpp
- hdfs_reader_writer.cpp
+
+ ../io/local_file_reader.cpp
+ ../io/local_file_writer.cpp
+ ../io/broker_reader.cpp
+ ../io/broker_writer.cpp
+ ../io/s3_reader.cpp
+ ../io/s3_writer.cpp
+ ../io/hdfs_reader_writer.cpp
+ ../io/file_factory.cpp
)
if (ARCH_AMD64)
set(EXEC_FILES
${EXEC_FILES}
- hdfs_file_reader.cpp
- hdfs_writer.cpp
+ ../io/hdfs_file_reader.cpp
+ ../io/hdfs_writer.cpp
)
endif()
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
index 789c66be2c..94289a990b 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -21,9 +21,9 @@
#include <time.h>
#include "common/logging.h"
-#include "exec/file_reader.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
+#include "io/file_reader.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 5815f008df..536b852ad6 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -21,7 +21,7 @@
#include <time.h>
#include "common/logging.h"
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
#include "runtime/mem_pool.h"
#include "runtime/tuple.h"
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index 330d983c36..dad3e9e5e8 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -27,7 +27,11 @@
#include "common/logging.h"
#include "common/status.h"
-#include "exec/file_reader.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/TPaloBrokerService.h"
+#include "io/file_reader.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "runtime/string_value.h"
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 2c21fd3d54..8942c4fd67 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -23,22 +23,20 @@
#include <sstream>
#include "common/consts.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
#include "exec/exec_node.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_binary_line_reader.h"
#include "exec/plain_text_line_reader.h"
-#include "exec/s3_reader.h"
#include "exprs/expr.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/raw_value.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/tuple.h"
+#include "util/string_util.h"
#include "util/utf8_check.h"
namespace doris {
@@ -109,11 +107,8 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
break; // break always
}
}
- if (_scanner_eof) {
- *eof = true;
- } else {
- *eof = false;
- }
+
+ *eof = _scanner_eof;
return Status::OK();
}
@@ -131,16 +126,6 @@ Status BrokerScanner::open_next_reader() {
}
Status BrokerScanner::open_file_reader() {
- if (_cur_file_reader != nullptr) {
- if (_stream_load_pipe != nullptr) {
- _stream_load_pipe.reset();
- _cur_file_reader = nullptr;
- } else {
- delete _cur_file_reader;
- _cur_file_reader = nullptr;
- }
- }
-
const TBrokerRangeDesc& range = _ranges[_next_range];
int64_t start_offset = range.start_offset;
if (start_offset != 0) {
@@ -155,53 +140,11 @@ Status BrokerScanner::open_file_reader() {
_skip_lines = 2;
}
}
- switch (range.file_type) {
- case TFileType::FILE_LOCAL: {
- LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset);
- RETURN_IF_ERROR(file_reader->open());
- _cur_file_reader = file_reader;
- break;
- }
- case TFileType::FILE_HDFS: {
- FileReader* hdfs_file_reader;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset,
- &hdfs_file_reader));
- BufferedReader* file_reader = new BufferedReader(_profile, hdfs_file_reader);
- RETURN_IF_ERROR(file_reader->open());
- _cur_file_reader = file_reader;
- break;
- }
- case TFileType::FILE_BROKER: {
- BrokerReader* broker_reader =
- new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
- range.path, start_offset);
- RETURN_IF_ERROR(broker_reader->open());
- _cur_file_reader = broker_reader;
- break;
- }
- case TFileType::FILE_S3: {
- BufferedReader* s3_reader = new BufferedReader(
- _profile, new S3Reader(_params.properties, range.path, start_offset));
- RETURN_IF_ERROR(s3_reader->open());
- _cur_file_reader = s3_reader;
- break;
- }
- case TFileType::FILE_STREAM: {
- _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id);
- if (_stream_load_pipe == nullptr) {
- VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id);
- return Status::InternalError("unknown stream load id");
- }
- _cur_file_reader = _stream_load_pipe.get();
- break;
- }
- default: {
- std::stringstream ss;
- ss << "Unknown file type, type=" << range.file_type;
- return Status::InternalError(ss.str());
- }
- }
- return Status::OK();
+
+ RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile,
+ _broker_addresses, _params.properties, range,
+ start_offset, _cur_file_reader));
+ return _cur_file_reader->open();
}
Status BrokerScanner::create_decompressor(TFileFormatType::type type) {
@@ -280,11 +223,12 @@ Status BrokerScanner::open_line_reader() {
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE:
- _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, _cur_decompressor,
- size, _line_delimiter, _line_delimiter_length);
+ _cur_line_reader =
+ new PlainTextLineReader(_profile, _cur_file_reader.get(), _cur_decompressor, size,
+ _line_delimiter, _line_delimiter_length);
break;
case TFileFormatType::FORMAT_PROTO:
- _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader);
+ _cur_line_reader = new PlainBinaryLineReader(_cur_file_reader.get());
break;
default: {
std::stringstream ss;
@@ -309,16 +253,6 @@ void BrokerScanner::close() {
delete _cur_line_reader;
_cur_line_reader = nullptr;
}
-
- if (_cur_file_reader != nullptr) {
- if (_stream_load_pipe != nullptr) {
- _stream_load_pipe.reset();
- _cur_file_reader = nullptr;
- } else {
- delete _cur_file_reader;
- _cur_file_reader = nullptr;
- }
- }
}
void BrokerScanner::split_line(const Slice& line) {
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index f10ce68518..379b1630cd 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -107,7 +107,7 @@ protected:
int _line_delimiter_length;
// Reader
- FileReader* _cur_file_reader;
+ std::shared_ptr<FileReader> _cur_file_reader;
LineReader* _cur_line_reader;
Decompressor* _cur_decompressor;
bool _cur_line_reader_eof;
@@ -117,8 +117,6 @@ protected:
// When we fetch range doesn't start from 0 will always skip the first line
int _skip_lines;
- // used to hold current StreamLoadPipe
- std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
std::vector<Slice> _split_values;
};
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 0be3d4c089..5e7b8ba309 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -22,14 +22,12 @@
#include <algorithm>
#include "env/env.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
-#include "exec/s3_reader.h"
#include "exprs/expr.h"
#include "exprs/json_functions.h"
#include "gutil/strings/split.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
@@ -67,7 +65,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
SCOPED_TIMER(_read_timer);
// Get one line
while (!_scanner_eof) {
- if (_cur_file_reader == nullptr || _cur_reader_eof) {
+ if (!_cur_file_reader || _cur_reader_eof) {
RETURN_IF_ERROR(open_next_reader());
// If there isn't any more reader, break this
if (_scanner_eof) {
@@ -124,16 +122,6 @@ Status JsonScanner::open_based_reader() {
}
Status JsonScanner::open_file_reader() {
- if (_cur_file_reader != nullptr) {
- if (_stream_load_pipe != nullptr) {
- _stream_load_pipe.reset();
- _cur_file_reader = nullptr;
- } else {
- delete _cur_file_reader;
- _cur_file_reader = nullptr;
- }
- }
-
const TBrokerRangeDesc& range = _ranges[_next_range];
int64_t start_offset = range.start_offset;
if (start_offset != 0) {
@@ -142,45 +130,12 @@ Status JsonScanner::open_file_reader() {
if (range.__isset.read_json_by_line) {
_read_json_by_line = range.read_json_by_line;
}
- switch (range.file_type) {
- case TFileType::FILE_LOCAL: {
- LocalFileReader* file_reader = new LocalFileReader(range.path, start_offset);
- RETURN_IF_ERROR(file_reader->open());
- _cur_file_reader = file_reader;
- break;
- }
- case TFileType::FILE_BROKER: {
- BrokerReader* broker_reader =
- new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
- range.path, start_offset);
- RETURN_IF_ERROR(broker_reader->open());
- _cur_file_reader = broker_reader;
- break;
- }
- case TFileType::FILE_S3: {
- BufferedReader* s3_reader = new BufferedReader(
- _profile, new S3Reader(_params.properties, range.path, start_offset));
- RETURN_IF_ERROR(s3_reader->open());
- _cur_file_reader = s3_reader;
- break;
- }
- case TFileType::FILE_STREAM: {
- _stream_load_pipe = _state->exec_env()->load_stream_mgr()->get(range.load_id);
- if (_stream_load_pipe == nullptr) {
- VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id);
- return Status::InternalError("unknown stream load id");
- }
- _cur_file_reader = _stream_load_pipe.get();
- break;
- }
- default: {
- std::stringstream ss;
- ss << "Unknown file type, type=" << range.file_type;
- return Status::InternalError(ss.str());
- }
- }
+
+ RETURN_IF_ERROR(FileFactory::create_file_reader(range.file_type, _state->exec_env(), _profile,
+ _broker_addresses, _params.properties, range,
+ start_offset, _cur_file_reader));
_cur_reader_eof = false;
- return Status::OK();
+ return _cur_file_reader->open();
}
Status JsonScanner::open_line_reader() {
@@ -197,7 +152,7 @@ Status JsonScanner::open_line_reader() {
} else {
_skip_next_line = false;
}
- _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader, nullptr, size,
+ _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size,
_line_delimiter, _line_delimiter_length);
_cur_reader_eof = false;
return Status::OK();
@@ -224,7 +179,7 @@ Status JsonScanner::open_json_reader() {
} else {
_cur_json_reader =
new JsonReader(_state, _counter, _profile, strip_outer_array, num_as_string,
- fuzzy_parse, &_scanner_eof, _cur_file_reader);
+ fuzzy_parse, &_scanner_eof, _cur_file_reader.get());
}
RETURN_IF_ERROR(_cur_json_reader->init(jsonpath, json_root));
@@ -264,14 +219,6 @@ void JsonScanner::close() {
delete _cur_line_reader;
_cur_line_reader = nullptr;
}
- if (_cur_file_reader != nullptr) {
- if (_stream_load_pipe != nullptr) {
- _stream_load_pipe.reset();
- } else {
- delete _cur_file_reader;
- }
- _cur_file_reader = nullptr;
- }
}
////// class JsonDataInternal
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index 5dc61c18cb..981e242d96 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -89,7 +89,7 @@ protected:
int _line_delimiter_length;
// Reader
- FileReader* _cur_file_reader;
+ std::shared_ptr<FileReader> _cur_file_reader;
LineReader* _cur_line_reader;
JsonReader* _cur_json_reader;
bool _cur_reader_eof;
@@ -98,9 +98,6 @@ protected:
// When we fetch range doesn't start from 0,
// we will read to one ahead, and skip the first line
bool _skip_next_line;
-
- // used to hold current StreamLoadPipe
- std::shared_ptr<StreamLoadPipe> _stream_load_pipe;
};
class JsonDataInternal {
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 138eb729ef..c20aef7d05 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -17,21 +17,14 @@
#include "exec/orc_scanner.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
-#include "exec/local_file_reader.h"
-#include "exec/s3_reader.h"
-#include "exprs/expr.h"
-#include "runtime/descriptors.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
+#include "io/local_file_reader.h"
#include "runtime/exec_env.h"
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
-#if defined(__x86_64__)
-#include "exec/hdfs_file_reader.h"
-#endif
-
// orc include file didn't expose orc::TimezoneError
// we have to declare it by hand, following is the source code in orc link
// https://github.com/apache/orc/blob/84353fbfc447b06e0924024a8e03c1aaebd3e7a5/c%2B%2B/src/Timezone.hh#L104-L109
@@ -387,44 +380,11 @@ Status ORCScanner::open_next_reader() {
}
const TBrokerRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
- switch (range.file_type) {
- case TFileType::FILE_LOCAL: {
- file_reader.reset(new LocalFileReader(range.path, range.start_offset));
- break;
- }
- case TFileType::FILE_BROKER: {
- int64_t file_size = 0;
- // for compatibility
- if (range.__isset.file_size) {
- file_size = range.file_size;
- }
- file_reader.reset(new BufferedReader(
- _profile,
- new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
- range.path, range.start_offset, file_size)));
- break;
- }
- case TFileType::FILE_S3: {
- file_reader.reset(new BufferedReader(
- _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
- break;
- }
- case TFileType::FILE_HDFS: {
-#if defined(__x86_64__)
- file_reader.reset(
- new HdfsFileReader(range.hdfs_params, range.path, range.start_offset));
- break;
-#else
- return Status::InternalError("HdfsFileReader do not support on non x86 platform");
-#endif
- }
- default: {
- std::stringstream ss;
- ss << "Unknown file type, type=" << range.file_type;
- return Status::InternalError(ss.str());
- }
- }
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ range.file_type, _state->exec_env(), _profile, _broker_addresses,
+ _params.properties, range, range.start_offset, file_reader));
RETURN_IF_ERROR(file_reader->open());
+
if (file_reader->size() == 0) {
file_reader->close();
continue;
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 880313b6f0..a47e965de5 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -18,13 +18,13 @@
#include "exec/parquet_scanner.h"
#include "exec/arrow/parquet_reader.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_reader.h"
-#include "exec/s3_reader.h"
#include "exec/text_converter.h"
+#include "exec/text_converter.hpp"
+#include "exprs/expr.h"
+#include "io/buffered_reader.h"
+#include "io/file_factory.h"
+#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/raw_value.h"
#include "runtime/stream_load/load_stream_mgr.h"
@@ -101,42 +101,11 @@ Status ParquetScanner::open_next_reader() {
}
const TBrokerRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
- switch (range.file_type) {
- case TFileType::FILE_LOCAL: {
- file_reader.reset(new LocalFileReader(range.path, range.start_offset));
- break;
- }
- case TFileType::FILE_HDFS: {
- FileReader* reader;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
- range.start_offset, &reader));
- file_reader.reset(reader);
- break;
- }
- case TFileType::FILE_BROKER: {
- int64_t file_size = 0;
- // for compatibility
- if (range.__isset.file_size) {
- file_size = range.file_size;
- }
- file_reader.reset(new BufferedReader(
- _profile,
- new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
- range.path, range.start_offset, file_size)));
- break;
- }
- case TFileType::FILE_S3: {
- file_reader.reset(new BufferedReader(
- _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
- break;
- }
- default: {
- std::stringstream ss;
- ss << "Unknown file type, type=" << range.file_type;
- return Status::InternalError(ss.str());
- }
- }
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ range.file_type, _state->exec_env(), _profile, _broker_addresses,
+ _params.properties, range, range.start_offset, file_reader));
RETURN_IF_ERROR(file_reader->open());
+
if (file_reader->size() == 0) {
file_reader->close();
continue;
diff --git a/be/src/exec/parquet_writer.cpp b/be/src/exec/parquet_writer.cpp
index befdb1cc54..5f1da82469 100644
--- a/be/src/exec/parquet_writer.cpp
+++ b/be/src/exec/parquet_writer.cpp
@@ -21,8 +21,17 @@
#include <arrow/status.h>
#include <time.h>
-#include "exec/file_writer.h"
+#include "common/logging.h"
+#include "gen_cpp/PaloBrokerService_types.h"
+#include "gen_cpp/TPaloBrokerService.h"
+#include "io/file_writer.h"
+#include "runtime/broker_mgr.h"
+#include "runtime/client_cache.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_pool.h"
#include "util/mysql_global.h"
+#include "util/thrift_util.h"
#include "util/types.h"
namespace doris {
diff --git a/be/src/exec/plain_binary_line_reader.cpp b/be/src/exec/plain_binary_line_reader.cpp
index 9cf1ff473f..f63671c622 100644
--- a/be/src/exec/plain_binary_line_reader.cpp
+++ b/be/src/exec/plain_binary_line_reader.cpp
@@ -18,7 +18,7 @@
#include "exec/plain_binary_line_reader.h"
#include "common/status.h"
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
namespace doris {
diff --git a/be/src/exec/plain_text_line_reader.cpp b/be/src/exec/plain_text_line_reader.cpp
index 8932e1c492..825982c5b1 100644
--- a/be/src/exec/plain_text_line_reader.cpp
+++ b/be/src/exec/plain_text_line_reader.cpp
@@ -19,7 +19,7 @@
#include "common/status.h"
#include "exec/decompressor.h"
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
// INPUT_CHUNK must
// larger than 15B for correct lz4 file decompressing
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
new file mode 100644
index 0000000000..1be0e05176
--- /dev/null
+++ b/be/src/io/CMakeLists.txt
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/io")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/io")
+
+set(EXEC_FILES
+ buffered_reader.cpp
+ local_file_reader.cpp
+ local_file_writer.cpp
+ broker_reader.cpp
+ broker_writer.cpp
+ s3_reader.cpp
+ s3_writer.cpp
+ hdfs_reader_writer.cpp
+ file_factory.cpp
+)
+
+if (ARCH_AMD64)
+ set(EXEC_FILES
+ ${EXEC_FILES}
+ hdfs_file_reader.cpp
+ hdfs_writer.cpp
+ )
+endif()
+
+add_library(IO STATIC
+ ${EXEC_FILES}
+)
diff --git a/be/src/exec/broker_reader.cpp b/be/src/io/broker_reader.cpp
similarity index 99%
rename from be/src/exec/broker_reader.cpp
rename to be/src/io/broker_reader.cpp
index 0d65bfc6f5..1745a7bbc2 100644
--- a/be/src/exec/broker_reader.cpp
+++ b/be/src/io/broker_reader.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/broker_reader.h"
+#include "broker_reader.h"
#include <sstream>
diff --git a/be/src/exec/broker_reader.h b/be/src/io/broker_reader.h
similarity index 98%
rename from be/src/exec/broker_reader.h
rename to be/src/io/broker_reader.h
index 05a4b24c79..94f86ca8df 100644
--- a/be/src/exec/broker_reader.h
+++ b/be/src/io/broker_reader.h
@@ -23,7 +23,7 @@
#include <string>
#include "common/status.h"
-#include "exec/file_reader.h"
+#include "file_reader.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/Types_types.h"
diff --git a/be/src/exec/broker_writer.cpp b/be/src/io/broker_writer.cpp
similarity index 99%
rename from be/src/exec/broker_writer.cpp
rename to be/src/io/broker_writer.cpp
index 58f9dd4931..3975abe664 100644
--- a/be/src/exec/broker_writer.cpp
+++ b/be/src/io/broker_writer.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/broker_writer.h"
+#include "broker_writer.h"
#include <sstream>
diff --git a/be/src/exec/broker_writer.h b/be/src/io/broker_writer.h
similarity index 98%
rename from be/src/exec/broker_writer.h
rename to be/src/io/broker_writer.h
index 9bb8c4c5a5..c4b8bac17c 100644
--- a/be/src/exec/broker_writer.h
+++ b/be/src/io/broker_writer.h
@@ -23,7 +23,7 @@
#include <string>
#include "common/status.h"
-#include "exec/file_writer.h"
+#include "file_writer.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/Types_types.h"
diff --git a/be/src/exec/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
similarity index 99%
rename from be/src/exec/buffered_reader.cpp
rename to be/src/io/buffered_reader.cpp
index 5c599d41b7..5a80c4f842 100644
--- a/be/src/exec/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/buffered_reader.h"
+#include "buffered_reader.h"
#include <algorithm>
#include <sstream>
diff --git a/be/src/exec/buffered_reader.h b/be/src/io/buffered_reader.h
similarity index 98%
rename from be/src/exec/buffered_reader.h
rename to be/src/io/buffered_reader.h
index 937e154ca7..8ffd5cd0ab 100644
--- a/be/src/exec/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -22,7 +22,7 @@
#include <memory>
#include "common/status.h"
-#include "exec/file_reader.h"
+#include "file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
new file mode 100644
index 0000000000..f8c86a9146
--- /dev/null
+++ b/be/src/io/file_factory.cpp
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "file_factory.h"
+
+#include "broker_reader.h"
+#include "broker_writer.h"
+#include "buffered_reader.h"
+#include "hdfs_reader_writer.h"
+#include "local_file_reader.h"
+#include "local_file_writer.h"
+#include "runtime/exec_env.h"
+#include "runtime/stream_load/load_stream_mgr.h"
+#include "s3_reader.h"
+#include "s3_writer.h"
+
+doris::Status doris::FileFactory::create_file_writer(
+ TFileType::type type, doris::ExecEnv* env,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties, const std::string& path,
+ int64_t start_offset, std::unique_ptr<FileWriter>& file_writer) {
+ switch (type) {
+ case TFileType::FILE_LOCAL: {
+ file_writer.reset(new LocalFileWriter(path, start_offset));
+ break;
+ }
+ case TFileType::FILE_BROKER: {
+ file_writer.reset(new BrokerWriter(env, broker_addresses, properties, path, start_offset));
+ break;
+ }
+ case TFileType::FILE_S3: {
+ file_writer.reset(new S3Writer(properties, path, start_offset));
+ break;
+ }
+ case TFileType::FILE_HDFS: {
+ RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+ const_cast<std::map<std::string, std::string>&>(properties), path, file_writer));
+ break;
+ }
+ default:
+ return Status::InternalError("UnSupport File Writer Type: " + std::to_string(type));
+ }
+
+ return Status::OK();
+}
+
+doris::Status doris::FileFactory::_new_file_reader(
+ doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties, const TBrokerRangeDesc& range,
+ int64_t start_offset, FileReader*& file_reader) {
+ switch (type) {
+ case TFileType::FILE_LOCAL: {
+ file_reader = new LocalFileReader(range.path, start_offset);
+ break;
+ }
+ case TFileType::FILE_BROKER: {
+ file_reader = new BufferedReader(
+ profile,
+ new BrokerReader(env, broker_addresses, properties, range.path, start_offset,
+ range.__isset.file_size ? range.file_size : 0));
+ break;
+ }
+ case TFileType::FILE_S3: {
+ file_reader =
+ new BufferedReader(profile, new S3Reader(properties, range.path, start_offset));
+ break;
+ }
+ case TFileType::FILE_HDFS: {
+ FileReader* hdfs_reader = nullptr;
+ RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, start_offset,
+ &hdfs_reader));
+ file_reader = new BufferedReader(profile, hdfs_reader);
+ break;
+ }
+ default:
+ return Status::InternalError("UnSupport File Reader Type: " + std::to_string(type));
+ }
+
+ return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(
+ doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range,
+ int64_t start_offset, std::unique_ptr<FileReader>& file_reader) {
+ if (type == TFileType::FILE_STREAM) {
+ return Status::InternalError("UnSupport UniquePtr For FileStream type");
+ }
+
+ FileReader* file_reader_ptr;
+ RETURN_IF_ERROR(_new_file_reader(type, env, profile, broker_addresses, properties, range,
+ start_offset, file_reader_ptr));
+ file_reader.reset(file_reader_ptr);
+
+ return Status::OK();
+}
+
+doris::Status doris::FileFactory::create_file_reader(
+ doris::TFileType::type type, doris::ExecEnv* env, RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties, const doris::TBrokerRangeDesc& range,
+ int64_t start_offset, std::shared_ptr<FileReader>& file_reader) {
+ if (type == TFileType::FILE_STREAM) {
+ file_reader = env->load_stream_mgr()->get(range.load_id);
+ if (!file_reader) {
+ VLOG_NOTICE << "unknown stream load id: " << UniqueId(range.load_id);
+ return Status::InternalError("unknown stream load id");
+ }
+ } else {
+ FileReader* file_reader_ptr;
+ RETURN_IF_ERROR(_new_file_reader(type, env, profile, broker_addresses, properties, range,
+ start_offset, file_reader_ptr));
+ file_reader.reset(file_reader_ptr);
+ }
+ return Status::OK();
+}
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
new file mode 100644
index 0000000000..36c2871599
--- /dev/null
+++ b/be/src/io/file_factory.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "file_reader.h"
+#include "file_writer.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+
+namespace doris {
+class ExecEnv;
+class TNetworkAddress;
+class RuntimeProfile;
+
+class FileFactory {
+public:
+ static Status create_file_writer(TFileType::type type, ExecEnv* env,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties,
+ const std::string& path, int64_t start_offset,
+ std::unique_ptr<FileWriter>& file_writer);
+
+ // Because StreamLoadPipe use std::shared_ptr, here we have to support both unique_ptr
+ // and shared_ptr create_file_reader
+ static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties,
+ const TBrokerRangeDesc& range, int64_t start_offset,
+ std::unique_ptr<FileReader>& file_reader);
+
+ static Status create_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties,
+ const TBrokerRangeDesc& range, int64_t start_offset,
+ std::shared_ptr<FileReader>& file_reader);
+
+ static TFileType::type convert_storage_type(TStorageBackendType::type type) {
+ switch (type) {
+ case TStorageBackendType::LOCAL:
+ return TFileType::FILE_LOCAL;
+ case TStorageBackendType::S3:
+ return TFileType::FILE_S3;
+ case TStorageBackendType::BROKER:
+ return TFileType::FILE_BROKER;
+ case TStorageBackendType::HDFS:
+ return TFileType::FILE_HDFS;
+ default:
+ LOG(FATAL) << "not match type to convert, from type:" << type;
+ }
+ __builtin_unreachable();
+ }
+
+private:
+ // Note: if the function return Status::OK() means new the file_reader. the caller
+ // should delete the memory of file_reader or use the smart_ptr to hold the own of file_reader
+ static Status _new_file_reader(TFileType::type type, ExecEnv* env, RuntimeProfile* profile,
+ const std::vector<TNetworkAddress>& broker_addresses,
+ const std::map<std::string, std::string>& properties,
+ const TBrokerRangeDesc& range, int64_t start_offset,
+ FileReader*& file_reader);
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/file_reader.h b/be/src/io/file_reader.h
similarity index 100%
rename from be/src/exec/file_reader.h
rename to be/src/io/file_reader.h
diff --git a/be/src/exec/file_writer.h b/be/src/io/file_writer.h
similarity index 100%
rename from be/src/exec/file_writer.h
rename to be/src/io/file_writer.h
diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
similarity index 99%
rename from be/src/exec/hdfs_file_reader.cpp
rename to be/src/io/hdfs_file_reader.cpp
index 25e3a09638..5d4dc2cc5e 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#include "exec/hdfs_file_reader.h"
+#include "hdfs_file_reader.h"
#include <sys/stat.h>
#include <unistd.h>
diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h
similarity index 98%
rename from be/src/exec/hdfs_file_reader.h
rename to be/src/io/hdfs_file_reader.h
index d4430de3e2..83c8efb7dc 100644
--- a/be/src/exec/hdfs_file_reader.h
+++ b/be/src/io/hdfs_file_reader.h
@@ -19,7 +19,7 @@
#include <hdfs/hdfs.h>
-#include "exec/file_reader.h"
+#include "file_reader.h"
#include "gen_cpp/PlanNodes_types.h"
namespace doris {
diff --git a/be/src/exec/hdfs_reader_writer.cpp b/be/src/io/hdfs_reader_writer.cpp
similarity index 84%
rename from be/src/exec/hdfs_reader_writer.cpp
rename to be/src/io/hdfs_reader_writer.cpp
index 956fc9e40e..9a072512c0 100644
--- a/be/src/exec/hdfs_reader_writer.cpp
+++ b/be/src/io/hdfs_reader_writer.cpp
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/hdfs_reader_writer.h"
+#include "hdfs_reader_writer.h"
#if defined(__x86_64__)
-#include "exec/hdfs_file_reader.h"
-#include "exec/hdfs_writer.h"
+#include "hdfs_file_reader.h"
+#include "hdfs_writer.h"
#endif
namespace doris {
@@ -35,9 +35,10 @@ Status HdfsReaderWriter::create_reader(const THdfsParams& hdfs_params, const std
}
Status HdfsReaderWriter::create_writer(std::map<std::string, std::string>& properties,
- const std::string& path, FileWriter** writer) {
+ const std::string& path,
+ std::unique_ptr<FileWriter>& writer) {
#if defined(__x86_64__)
- *writer = new HDFSWriter(properties, path);
+ writer.reset(new HDFSWriter(properties, path));
return Status::OK();
#else
return Status::InternalError("HdfsWriter do not support on non x86 platform");
diff --git a/be/src/exec/hdfs_reader_writer.h b/be/src/io/hdfs_reader_writer.h
similarity index 91%
rename from be/src/exec/hdfs_reader_writer.h
rename to be/src/io/hdfs_reader_writer.h
index e0decf2035..160306ffce 100644
--- a/be/src/exec/hdfs_reader_writer.h
+++ b/be/src/io/hdfs_reader_writer.h
@@ -17,8 +17,8 @@
#pragma once
-#include "exec/file_reader.h"
-#include "exec/file_writer.h"
+#include "file_reader.h"
+#include "file_writer.h"
#include "gen_cpp/PlanNodes_types.h"
namespace doris {
@@ -35,7 +35,7 @@ public:
int64_t start_offset, FileReader** reader);
static Status create_writer(std::map<std::string, std::string>& properties,
- const std::string& path, FileWriter** writer);
+ const std::string& path, std::unique_ptr<FileWriter>& writer);
};
} // namespace doris
diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp
similarity index 99%
rename from be/src/exec/hdfs_writer.cpp
rename to be/src/io/hdfs_writer.cpp
index a362488d8b..16b2516ca8 100644
--- a/be/src/exec/hdfs_writer.cpp
+++ b/be/src/io/hdfs_writer.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/hdfs_writer.h"
+#include "hdfs_writer.h"
#include <filesystem>
diff --git a/be/src/exec/hdfs_writer.h b/be/src/io/hdfs_writer.h
similarity index 98%
rename from be/src/exec/hdfs_writer.h
rename to be/src/io/hdfs_writer.h
index 8bc9060c2a..15d9a1fde8 100644
--- a/be/src/exec/hdfs_writer.h
+++ b/be/src/io/hdfs_writer.h
@@ -22,7 +22,7 @@
#include <map>
#include <string>
-#include "exec/file_writer.h"
+#include "file_writer.h"
namespace doris {
class HDFSWriter : public FileWriter {
diff --git a/be/src/exec/local_file_reader.cpp b/be/src/io/local_file_reader.cpp
similarity index 99%
rename from be/src/exec/local_file_reader.cpp
rename to be/src/io/local_file_reader.cpp
index d5c8454532..e8e2d38ea2 100644
--- a/be/src/exec/local_file_reader.cpp
+++ b/be/src/io/local_file_reader.cpp
@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#include "exec/local_file_reader.h"
+#include "local_file_reader.h"
#include <sys/stat.h>
#include <unistd.h>
diff --git a/be/src/exec/local_file_reader.h b/be/src/io/local_file_reader.h
similarity index 98%
rename from be/src/exec/local_file_reader.h
rename to be/src/io/local_file_reader.h
index 3224f94562..c525804395 100644
--- a/be/src/exec/local_file_reader.h
+++ b/be/src/io/local_file_reader.h
@@ -20,7 +20,7 @@
#define _FILE_OFFSET_BITS 64
#include <stdio.h>
-#include "exec/file_reader.h"
+#include "file_reader.h"
namespace doris {
diff --git a/be/src/exec/local_file_writer.cpp b/be/src/io/local_file_writer.cpp
similarity index 75%
rename from be/src/exec/local_file_writer.cpp
rename to be/src/io/local_file_writer.cpp
index 056d4b0cea..3c425a46aa 100644
--- a/be/src/exec/local_file_writer.cpp
+++ b/be/src/io/local_file_writer.cpp
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/local_file_writer.h"
+#include "local_file_writer.h"
+#include "service/backend_options.h"
#include "util/error_util.h"
+#include "util/file_utils.h"
namespace doris {
@@ -29,6 +31,8 @@ LocalFileWriter::~LocalFileWriter() {
}
Status LocalFileWriter::open() {
+ RETURN_IF_ERROR(_check_file_path(_path));
+
_fp = fopen(_path.c_str(), "w+");
if (_fp == nullptr) {
std::stringstream ss;
@@ -72,4 +76,18 @@ Status LocalFileWriter::close() {
return Status::OK();
}
+Status LocalFileWriter::_check_file_path(const std::string& file_path) {
+ // For local file writer, the file_path is a local dir.
+ // Here we do a simple security verification by checking whether the file exists.
+ // Because the file path is currently arbitrarily specified by the user,
+ // Doris is not responsible for ensuring the correctness of the path.
+ // This is just to prevent overwriting the existing file.
+ if (FileUtils::check_exist(file_path)) {
+ return Status::InternalError("File already exists: " + file_path +
+ ". Host: " + BackendOptions::get_localhost());
+ }
+
+ return Status::OK();
+}
+
} // end namespace doris
diff --git a/be/src/exec/local_file_writer.h b/be/src/io/local_file_writer.h
similarity index 90%
rename from be/src/exec/local_file_writer.h
rename to be/src/io/local_file_writer.h
index 63be5e9d79..7d9da485c3 100644
--- a/be/src/exec/local_file_writer.h
+++ b/be/src/io/local_file_writer.h
@@ -19,7 +19,7 @@
#include <stdio.h>
-#include "exec/file_writer.h"
+#include "file_writer.h"
namespace doris {
@@ -28,7 +28,8 @@ class RuntimeState;
class LocalFileWriter : public FileWriter {
public:
LocalFileWriter(const std::string& path, int64_t start_offset);
- virtual ~LocalFileWriter();
+
+ ~LocalFileWriter() override;
Status open() override;
@@ -37,6 +38,8 @@ public:
virtual Status close() override;
private:
+ static Status _check_file_path(const std::string& file_path);
+
std::string _path;
int64_t _start_offset;
FILE* _fp;
diff --git a/be/src/exec/s3_reader.cpp b/be/src/io/s3_reader.cpp
similarity index 99%
rename from be/src/exec/s3_reader.cpp
rename to be/src/io/s3_reader.cpp
index 30e6daaa59..c932e2d886 100644
--- a/be/src/exec/s3_reader.cpp
+++ b/be/src/io/s3_reader.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/s3_reader.h"
+#include "s3_reader.h"
#include <aws/s3/S3Client.h>
#include <aws/s3/model/GetObjectRequest.h>
diff --git a/be/src/exec/s3_reader.h b/be/src/io/s3_reader.h
similarity index 98%
rename from be/src/exec/s3_reader.h
rename to be/src/io/s3_reader.h
index 0de0b0944e..a1464324df 100644
--- a/be/src/exec/s3_reader.h
+++ b/be/src/io/s3_reader.h
@@ -20,7 +20,7 @@
#include <map>
#include <string>
-#include "exec/file_reader.h"
+#include "file_reader.h"
#include "util/s3_uri.h"
namespace Aws {
diff --git a/be/src/exec/s3_writer.cpp b/be/src/io/s3_writer.cpp
similarity index 99%
rename from be/src/exec/s3_writer.cpp
rename to be/src/io/s3_writer.cpp
index 8b44c621d5..df37e7d820 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/io/s3_writer.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/s3_writer.h"
+#include "s3_writer.h"
#include <aws/core/utils/FileSystemUtils.h>
#include <aws/s3/S3Client.h>
diff --git a/be/src/exec/s3_writer.h b/be/src/io/s3_writer.h
similarity index 98%
rename from be/src/exec/s3_writer.h
rename to be/src/io/s3_writer.h
index 09084ac67b..ae2756da08 100644
--- a/be/src/exec/s3_writer.h
+++ b/be/src/io/s3_writer.h
@@ -20,7 +20,7 @@
#include <map>
#include <string>
-#include "exec/file_writer.h"
+#include "file_writer.h"
#include "util/s3_uri.h"
namespace Aws {
diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp
index 8b60d0f5f9..9f72c365de 100644
--- a/be/src/runtime/export_sink.cpp
+++ b/be/src/runtime/export_sink.cpp
@@ -21,13 +21,10 @@
#include <sstream>
-#include "exec/broker_writer.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_writer.h"
-#include "exec/s3_writer.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "gutil/strings/numbers.h"
+#include "io/file_factory.h"
#include "runtime/large_int_value.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
@@ -247,50 +244,14 @@ Status ExportSink::open_file_writer() {
}
std::string file_name = gen_file_name();
-
// TODO(lingbin): gen file path
- switch (_t_export_sink.file_type) {
- case TFileType::FILE_LOCAL: {
- LocalFileWriter* file_writer =
- new LocalFileWriter(_t_export_sink.export_path + "/" + file_name, 0);
- RETURN_IF_ERROR(file_writer->open());
- _file_writer.reset(file_writer);
- break;
- }
- case TFileType::FILE_BROKER: {
- BrokerWriter* broker_writer = new BrokerWriter(
- _state->exec_env(), _t_export_sink.broker_addresses, _t_export_sink.properties,
- _t_export_sink.export_path + "/" + file_name, 0 /* offset */);
- RETURN_IF_ERROR(broker_writer->open());
- _file_writer.reset(broker_writer);
- break;
- }
- case TFileType::FILE_S3: {
- S3Writer* s3_writer =
- new S3Writer(_t_export_sink.properties,
- _t_export_sink.export_path + "/" + file_name, 0 /* offset */);
- RETURN_IF_ERROR(s3_writer->open());
- _file_writer.reset(s3_writer);
- break;
- }
- case TFileType::FILE_HDFS: {
- FileWriter* hdfs_writer;
- RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
- const_cast<std::map<std::string, std::string>&>(_t_export_sink.properties),
- _t_export_sink.export_path + "/" + file_name, &hdfs_writer));
- RETURN_IF_ERROR(hdfs_writer->open());
- _file_writer.reset(hdfs_writer);
- break;
- }
- default: {
- std::stringstream ss;
- ss << "Unknown file type, type=" << _t_export_sink.file_type;
- return Status::InternalError(ss.str());
- }
- }
-
+ RETURN_IF_ERROR(FileFactory::create_file_writer(
+ _t_export_sink.file_type, _state->exec_env(), _t_export_sink.broker_addresses,
+ _t_export_sink.properties, _t_export_sink.export_path + "/" + file_name, 0,
+ _file_writer));
_state->add_export_output_file(_t_export_sink.export_path + "/" + file_name);
- return Status::OK();
+
+ return _file_writer->open();
}
// TODO(lingbin): add some other info to file name, like partition
diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp
index f2351476a3..8f4e0c0d03 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -18,15 +18,12 @@
#include "runtime/file_result_writer.h"
#include "common/consts.h"
-#include "exec/broker_writer.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_writer.h"
#include "exec/parquet_writer.h"
-#include "exec/s3_writer.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
+#include "io/file_factory.h"
#include "runtime/buffer_control_block.h"
#include "runtime/large_int_value.h"
#include "runtime/primitive_type.h"
@@ -95,15 +92,6 @@ Status FileResultWriter::_get_success_file_name(std::string* file_name) {
ss << _file_opts->file_path << _file_opts->success_file_name;
*file_name = ss.str();
if (_storage_type == TStorageBackendType::LOCAL) {
- // For local file writer, the file_path is a local dir.
- // Here we do a simple security verification by checking whether the file exists.
- // Because the file path is currently arbitrarily specified by the user,
- // Doris is not responsible for ensuring the correctness of the path.
- // This is just to prevent overwriting the existing file.
- if (FileUtils::check_exist(*file_name)) {
- return Status::InternalError("File already exists: " + *file_name +
- ". Host: " + BackendOptions::get_localhost());
- }
}
return Status::OK();
@@ -116,26 +104,18 @@ Status FileResultWriter::_create_next_file_writer() {
}
Status FileResultWriter::_create_file_writer(const std::string& file_name) {
- if (_storage_type == TStorageBackendType::LOCAL) {
- _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
- } else if (_storage_type == TStorageBackendType::BROKER) {
- _file_writer =
- new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses,
- _file_opts->broker_properties, file_name, 0 /*start offset*/);
- } else if (_storage_type == TStorageBackendType::S3) {
- _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */);
- } else if (_storage_type == TStorageBackendType::HDFS) {
- RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
- const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties),
- file_name, &_file_writer));
- }
+ RETURN_IF_ERROR(FileFactory::create_file_writer(
+ FileFactory::convert_storage_type(_storage_type), _state->exec_env(),
+ _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0,
+ _file_writer));
RETURN_IF_ERROR(_file_writer->open());
+
switch (_file_opts->file_format) {
case TFileFormatType::FORMAT_CSV_PLAIN:
// just use file writer is enough
break;
case TFileFormatType::FORMAT_PARQUET:
- _parquet_writer = new ParquetWriterWrapper(_file_writer, _output_expr_ctxs,
+ _parquet_writer = new ParquetWriterWrapper(_file_writer.get(), _output_expr_ctxs,
_file_opts->file_properties, _file_opts->schema,
_output_object_data);
break;
@@ -417,12 +397,8 @@ Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
delete _parquet_writer;
_parquet_writer = nullptr;
- delete _file_writer;
- _file_writer = nullptr;
- } else if (_file_writer != nullptr) {
+ } else if (_file_writer) {
_file_writer->close();
- delete _file_writer;
- _file_writer = nullptr;
}
if (only_close) {
diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h
index b8d9f71d77..03f53bd165 100644
--- a/be/src/runtime/file_result_writer.h
+++ b/be/src/runtime/file_result_writer.h
@@ -132,7 +132,7 @@ private:
// If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
- FileWriter* _file_writer = nullptr;
+ std::unique_ptr<FileWriter> _file_writer;
// parquet file writer
ParquetWriterWrapper* _parquet_writer = nullptr;
// Used to buffer the export data of plain text
diff --git a/be/src/runtime/routine_load/kafka_consumer_pipe.h b/be/src/runtime/routine_load/kafka_consumer_pipe.h
index 6e7fc43eec..6d01bbe091 100644
--- a/be/src/runtime/routine_load/kafka_consumer_pipe.h
+++ b/be/src/runtime/routine_load/kafka_consumer_pipe.h
@@ -23,7 +23,7 @@
#include <string>
#include <vector>
-#include "exec/file_reader.h"
+#include "io/file_reader.h"
#include "librdkafka/rdkafka.h"
#include "runtime/message_body_sink.h"
#include "runtime/stream_load/stream_load_pipe.h"
diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h
index d793d1cbd6..716a145f61 100644
--- a/be/src/runtime/stream_load/stream_load_pipe.h
+++ b/be/src/runtime/stream_load/stream_load_pipe.h
@@ -21,8 +21,8 @@
#include <deque>
#include <mutex>
-#include "exec/file_reader.h"
#include "gen_cpp/internal_service.pb.h"
+#include "io/file_reader.h"
#include "runtime/message_body_sink.h"
#include "util/bit_util.h"
#include "util/byte_buffer.h"
diff --git a/be/src/util/broker_load_error_hub.cpp b/be/src/util/broker_load_error_hub.cpp
index 84b75d51f9..a8c0441b27 100644
--- a/be/src/util/broker_load_error_hub.cpp
+++ b/be/src/util/broker_load_error_hub.cpp
@@ -17,7 +17,7 @@
#include "util/broker_load_error_hub.h"
-#include "exec/broker_writer.h"
+#include "io/broker_writer.h"
#include "util/defer_op.h"
namespace doris {
diff --git a/be/src/util/broker_storage_backend.cpp b/be/src/util/broker_storage_backend.cpp
index c812e98f48..fc653bcb2c 100644
--- a/be/src/util/broker_storage_backend.cpp
+++ b/be/src/util/broker_storage_backend.cpp
@@ -18,13 +18,13 @@
#include "util/broker_storage_backend.h"
#include "env/env.h"
-#include "exec/broker_reader.h"
-#include "exec/broker_writer.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
+#include "io/broker_reader.h"
+#include "io/broker_writer.h"
#include "olap/file_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index b44475ce19..971f1b45bc 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -16,12 +16,8 @@
// under the License.
#include "exec/arrow/parquet_reader.h"
-#include "exec/broker_reader.h"
-#include "exec/buffered_reader.h"
-#include "exec/hdfs_reader_writer.h"
-#include "exec/local_file_reader.h"
-#include "exec/s3_reader.h"
#include "exprs/expr.h"
+#include "io/file_factory.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "vec/data_types/data_type_factory.hpp"
@@ -61,41 +57,9 @@ Status VArrowScanner::_open_next_reader() {
}
const TBrokerRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
- switch (range.file_type) {
- case TFileType::FILE_LOCAL: {
- file_reader.reset(new LocalFileReader(range.path, range.start_offset));
- break;
- }
- case TFileType::FILE_HDFS: {
- FileReader* reader;
- RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path,
- range.start_offset, &reader));
- file_reader.reset(reader);
- break;
- }
- case TFileType::FILE_BROKER: {
- int64_t file_size = 0;
- // for compatibility
- if (range.__isset.file_size) {
- file_size = range.file_size;
- }
- file_reader.reset(new BufferedReader(
- _profile,
- new BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
- range.path, range.start_offset, file_size)));
- break;
- }
- case TFileType::FILE_S3: {
- file_reader.reset(new BufferedReader(
- _profile, new S3Reader(_params.properties, range.path, range.start_offset)));
- break;
- }
- default: {
- std::stringstream ss;
- ss << "Unknown file type, type=" << range.file_type;
- return Status::InternalError(ss.str());
- }
- }
+ RETURN_IF_ERROR(FileFactory::create_file_reader(
+ range.file_type, _state->exec_env(), _profile, _broker_addresses,
+ _params.properties, range, range.start_offset, file_reader));
RETURN_IF_ERROR(file_reader->open());
if (file_reader->size() == 0) {
file_reader->close();
diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp
index 837b3ec289..1acd304e77 100644
--- a/be/src/vec/exec/vjson_scanner.cpp
+++ b/be/src/vec/exec/vjson_scanner.cpp
@@ -105,7 +105,7 @@ Status VJsonScanner::open_vjson_reader() {
} else {
_cur_vjson_reader.reset(new VJsonReader(_state, _counter, _profile, strip_outer_array,
num_as_string, fuzzy_parse, &_scanner_eof,
- _cur_file_reader));
+ _cur_file_reader.get()));
}
RETURN_IF_ERROR(_cur_vjson_reader->init(jsonpath, json_root));
diff --git a/be/test/exec/broker_reader_test.cpp b/be/test/exec/broker_reader_test.cpp
index c9f19279e4..0923b4edb4 100644
--- a/be/test/exec/broker_reader_test.cpp
+++ b/be/test/exec/broker_reader_test.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/broker_reader.h"
+#include "io/broker_reader.h"
#include <gtest/gtest.h>
diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp
index 18128200ea..bc7c91dff3 100644
--- a/be/test/exec/broker_scan_node_test.cpp
+++ b/be/test/exec/broker_scan_node_test.cpp
@@ -24,10 +24,10 @@
#include <vector>
#include "common/object_pool.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp
index 689e876754..0a65982058 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -24,10 +24,10 @@
#include <vector>
#include "common/object_pool.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/exec/buffered_reader_test.cpp b/be/test/exec/buffered_reader_test.cpp
index 7fe6c47fee..940635a7f1 100644
--- a/be/test/exec/buffered_reader_test.cpp
+++ b/be/test/exec/buffered_reader_test.cpp
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/buffered_reader.h"
+#include "io/buffered_reader.h"
#include <gtest/gtest.h>
-#include "exec/local_file_reader.h"
+#include "io/local_file_reader.h"
#include "util/stopwatch.hpp"
namespace doris {
diff --git a/be/test/exec/hdfs_file_reader_test.cpp b/be/test/exec/hdfs_file_reader_test.cpp
index 6807272bac..a5e85f2bc8 100644
--- a/be/test/exec/hdfs_file_reader_test.cpp
+++ b/be/test/exec/hdfs_file_reader_test.cpp
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/hdfs_file_reader.h"
+#include "io/hdfs_file_reader.h"
#include <gtest/gtest.h>
-#include "exec/hdfs_reader_writer.h"
+#include "io/hdfs_reader_writer.h"
namespace doris {
diff --git a/be/test/exec/json_scanner_test.cpp b/be/test/exec/json_scanner_test.cpp
index 582e26b7c5..744746f7b7 100644
--- a/be/test/exec/json_scanner_test.cpp
+++ b/be/test/exec/json_scanner_test.cpp
@@ -24,11 +24,11 @@
#include "common/object_pool.h"
#include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "exprs/decimalv2_operators.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/row_batch.h"
diff --git a/be/test/exec/json_scanner_with_jsonpath_test.cpp b/be/test/exec/json_scanner_with_jsonpath_test.cpp
index 9db0bd37b4..0394de1035 100644
--- a/be/test/exec/json_scanner_with_jsonpath_test.cpp
+++ b/be/test/exec/json_scanner_with_jsonpath_test.cpp
@@ -24,10 +24,10 @@
#include "common/object_pool.h"
#include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/row_batch.h"
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
index 3712f6b141..614d9f279f 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -23,10 +23,10 @@
#include "common/object_pool.h"
#include "exec/broker_scanner.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/exec/orc_scanner_test.cpp b/be/test/exec/orc_scanner_test.cpp
index f5f99924f2..cd1023e692 100644
--- a/be/test/exec/orc_scanner_test.cpp
+++ b/be/test/exec/orc_scanner_test.cpp
@@ -27,11 +27,11 @@
#include "common/object_pool.h"
#include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "exprs/decimalv2_operators.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp
index 8db72313ba..cbd673d4dc 100644
--- a/be/test/exec/parquet_scanner_test.cpp
+++ b/be/test/exec/parquet_scanner_test.cpp
@@ -24,10 +24,10 @@
#include "common/object_pool.h"
#include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/exec/plain_text_line_reader_bzip_test.cpp b/be/test/exec/plain_text_line_reader_bzip_test.cpp
index 747313c803..900f261673 100644
--- a/be/test/exec/plain_text_line_reader_bzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_bzip_test.cpp
@@ -18,8 +18,8 @@
#include <gtest/gtest.h>
#include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
#include "util/runtime_profile.h"
namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_gzip_test.cpp b/be/test/exec/plain_text_line_reader_gzip_test.cpp
index 1e461e8b82..fea15d00c4 100644
--- a/be/test/exec/plain_text_line_reader_gzip_test.cpp
+++ b/be/test/exec/plain_text_line_reader_gzip_test.cpp
@@ -18,8 +18,8 @@
#include <gtest/gtest.h>
#include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
#include "util/runtime_profile.h"
namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
index 0ebd218b28..f6d0844455 100644
--- a/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lz4frame_test.cpp
@@ -18,8 +18,8 @@
#include <gtest/gtest.h>
#include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
#include "util/runtime_profile.h"
namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_lzop_test.cpp b/be/test/exec/plain_text_line_reader_lzop_test.cpp
index f8f8b6090d..99aa6336e0 100644
--- a/be/test/exec/plain_text_line_reader_lzop_test.cpp
+++ b/be/test/exec/plain_text_line_reader_lzop_test.cpp
@@ -18,8 +18,8 @@
#include <gtest/gtest.h>
#include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
#include "util/runtime_profile.h"
namespace doris {
diff --git a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
index b9361999b4..815d119ba1 100644
--- a/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
+++ b/be/test/exec/plain_text_line_reader_uncompressed_test.cpp
@@ -18,8 +18,8 @@
#include <gtest/gtest.h>
#include "exec/decompressor.h"
-#include "exec/local_file_reader.h"
#include "exec/plain_text_line_reader.h"
+#include "io/local_file_reader.h"
#include "util/runtime_profile.h"
namespace doris {
diff --git a/be/test/exec/s3_reader_test.cpp b/be/test/exec/s3_reader_test.cpp
index 3c11a19180..d41a78975a 100644
--- a/be/test/exec/s3_reader_test.cpp
+++ b/be/test/exec/s3_reader_test.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exec/s3_reader.h"
+#include "io/s3_reader.h"
#include <aws/core/Aws.h>
#include <gtest/gtest.h>
@@ -28,7 +28,7 @@
#include <string>
#include <vector>
-#include "exec/s3_writer.h"
+#include "io/s3_writer.h"
namespace doris {
static const std::string AK = "";
diff --git a/be/test/vec/exec/vbroker_scan_node_test.cpp b/be/test/vec/exec/vbroker_scan_node_test.cpp
index 0683cf4b7a..719d5014ea 100644
--- a/be/test/vec/exec/vbroker_scan_node_test.cpp
+++ b/be/test/vec/exec/vbroker_scan_node_test.cpp
@@ -23,13 +23,13 @@
#include <vector>
#include "common/object_pool.h"
-#include "exec/local_file_reader.h"
#include "exprs/binary_predicate.h"
#include "exprs/cast_functions.h"
#include "exprs/literal.h"
#include "exprs/slot_ref.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/primitive_type.h"
diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp
index 428d82343c..27c4643050 100644
--- a/be/test/vec/exec/vbroker_scanner_test.cpp
+++ b/be/test/vec/exec/vbroker_scanner_test.cpp
@@ -23,10 +23,10 @@
#include <vector>
#include "common/object_pool.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp
index 393ff80f16..b059faf010 100644
--- a/be/test/vec/exec/vjson_scanner_test.cpp
+++ b/be/test/vec/exec/vjson_scanner_test.cpp
@@ -26,11 +26,11 @@
#include "common/object_pool.h"
#include "exec/broker_scan_node.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "exprs/decimalv2_operators.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/row_batch.h"
diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp
index 92a9013786..e6b3b2a96f 100644
--- a/be/test/vec/exec/vorc_scanner_test.cpp
+++ b/be/test/vec/exec/vorc_scanner_test.cpp
@@ -26,12 +26,12 @@
#include <vector>
#include "common/object_pool.h"
-#include "exec/local_file_reader.h"
#include "exec/orc_scanner.h"
#include "exprs/cast_functions.h"
#include "exprs/decimalv2_operators.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp
index ba8f70ce70..6d3810cc73 100644
--- a/be/test/vec/exec/vparquet_scanner_test.cpp
+++ b/be/test/vec/exec/vparquet_scanner_test.cpp
@@ -23,10 +23,10 @@
#include <vector>
#include "common/object_pool.h"
-#include "exec/local_file_reader.h"
#include "exprs/cast_functions.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/local_file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org