You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/05 07:52:00 UTC
[doris] branch master updated: [fix](file_cache) turn on file cache by FE session variable (#18340)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 47aa8a6d8a [fix](file_cache) turn on file cache by FE session variable (#18340)
47aa8a6d8a is described below
commit 47aa8a6d8ae05c2c064f86d3e2b56276d2297d69
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Wed Apr 5 15:51:47 2023 +0800
[fix](file_cache) turn on file cache by FE session variable (#18340)
Fix tow bugs:
1. Enabling file caching requires both `FE session` and `BE` configurations(enable_file_cache=true) to be enabled.
2. `ParquetReader` has not used `IOContext` previously, but `CachedRemoteFileReader::read_at` needs `IOContext` after PR(#17586).
---
be/src/io/file_factory.cpp | 22 +++++++++-------
be/src/io/file_factory.h | 12 +++++----
be/src/io/fs/buffered_reader.cpp | 10 ++++----
be/src/io/fs/buffered_reader.h | 10 +++++---
be/src/io/io_common.h | 4 +--
be/src/vec/exec/format/csv/csv_reader.cpp | 10 +++++---
be/src/vec/exec/format/json/new_json_reader.cpp | 6 +++--
be/src/vec/exec/format/orc/vorc_reader.cpp | 14 +++++++----
be/src/vec/exec/format/orc/vorc_reader.h | 3 ++-
.../vec/exec/format/parquet/parquet_thrift_util.h | 10 +++++---
.../parquet/vparquet_column_chunk_reader.cpp | 7 +++---
.../format/parquet/vparquet_column_chunk_reader.h | 3 ++-
.../exec/format/parquet/vparquet_column_reader.cpp | 21 ++++++++--------
.../exec/format/parquet/vparquet_column_reader.h | 29 ++++++++++++++--------
.../exec/format/parquet/vparquet_group_reader.cpp | 6 +++--
.../exec/format/parquet/vparquet_group_reader.h | 3 ++-
.../exec/format/parquet/vparquet_page_reader.cpp | 9 ++++---
.../vec/exec/format/parquet/vparquet_page_reader.h | 4 ++-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 23 +++++++++++------
be/src/vec/exec/scan/vfile_scanner.cpp | 5 ++--
be/src/vec/exec/varrow_scanner.cpp | 7 +++---
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 17 ++++++++-----
22 files changed, 141 insertions(+), 94 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 64bbcfd1d9..c539f9ae50 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -26,16 +26,23 @@
#include "io/fs/hdfs_file_system.h"
#include "io/fs/hdfs_file_writer.h"
#include "io/fs/local_file_system.h"
-#include "io/fs/local_file_writer.h"
-#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
-#include "io/fs/s3_file_writer.h"
#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
+#include "util/runtime_profile.h"
#include "util/s3_uri.h"
namespace doris {
+io::FileCachePolicy FileFactory::get_cache_policy(RuntimeState* state) {
+ if (state != nullptr) {
+ if (config::enable_file_cache && state->query_options().enable_file_cache) {
+ return io::FileCachePolicy::FILE_BLOCK_CACHE;
+ }
+ }
+ return io::FileCachePolicy::NO_CACHE;
+}
Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
@@ -78,16 +85,13 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env,
return Status::OK();
}
-Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/,
+Status FileFactory::create_file_reader(RuntimeProfile* profile,
const FileSystemProperties& system_properties,
const FileDescription& file_description,
std::shared_ptr<io::FileSystem>* file_system,
- io::FileReaderSPtr* file_reader) {
+ io::FileReaderSPtr* file_reader,
+ io::FileCachePolicy cache_policy) {
TFileType::type type = system_properties.system_type;
- auto cache_policy = io::FileCachePolicy::NO_CACHE;
- if (config::enable_file_cache) {
- cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
- }
io::FileBlockCachePathPolicy file_block_cache;
io::FileReaderOptions reader_options(cache_policy, file_block_cache);
reader_options.file_size = file_description.file_size;
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 978241f8d6..01b13cc521 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -45,6 +45,8 @@ struct FileDescription {
class FileFactory {
public:
+ static io::FileCachePolicy get_cache_policy(RuntimeState* state);
+
/// Create FileWriter
static Status create_file_writer(TFileType::type type, ExecEnv* env,
const std::vector<TNetworkAddress>& broker_addresses,
@@ -53,11 +55,11 @@ public:
std::unique_ptr<io::FileWriter>& file_writer);
/// Create FileReader
- static Status create_file_reader(RuntimeProfile* profile,
- const FileSystemProperties& system_properties,
- const FileDescription& file_description,
- std::shared_ptr<io::FileSystem>* file_system,
- io::FileReaderSPtr* file_reader);
+ static Status create_file_reader(
+ RuntimeProfile* profile, const FileSystemProperties& system_properties,
+ const FileDescription& file_description, std::shared_ptr<io::FileSystem>* file_system,
+ io::FileReaderSPtr* file_reader,
+ io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE);
// Create FileReader for stream load pipe
static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader);
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 4f47ff5671..d29ba2fe0d 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -23,7 +23,6 @@
#include "common/config.h"
#include "olap/iterators.h"
#include "olap/olap_define.h"
-#include "util/bit_util.h"
namespace doris {
namespace io {
@@ -191,7 +190,7 @@ BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint
_max_buf_size(max_buf_size) {}
Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset,
- const size_t bytes_to_read) {
+ const size_t bytes_to_read, const IOContext* io_ctx) {
if (offset < _file_start_offset || offset >= _file_end_offset) {
return Status::IOError("Out-of-bounds Access");
}
@@ -223,7 +222,7 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
while (has_read < to_read) {
size_t loop_read = 0;
Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read);
- RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read));
+ RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx));
_statistics.read_calls++;
if (loop_read == 0) {
break;
@@ -239,8 +238,9 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
return Status::OK();
}
-Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) {
- return read_bytes((const uint8_t**)&slice.data, offset, slice.size);
+Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset,
+ const IOContext* io_ctx) {
+ return read_bytes((const uint8_t**)&slice.data, offset, slice.size, io_ctx);
}
} // namespace io
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 2b3b6054b9..434058e32e 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -156,11 +156,12 @@ public:
* @param offset start offset ot read in stream
* @param bytes_to_read bytes to read
*/
- virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) = 0;
+ virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read,
+ const IOContext* io_ctx) = 0;
/**
* Save the data address to slice.data, and the slice.size is the bytes to read.
*/
- virtual Status read_bytes(Slice& slice, uint64_t offset) = 0;
+ virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0;
Statistics& statistics() { return _statistics; }
virtual ~BufferedStreamReader() = default;
// return the file path
@@ -176,8 +177,9 @@ public:
size_t max_buf_size);
~BufferedFileStreamReader() override = default;
- Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) override;
- Status read_bytes(Slice& slice, uint64_t offset) override;
+ Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read,
+ const IOContext* io_ctx) override;
+ Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
std::string path() override { return _file->path(); }
private:
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 39a059b7cb..d99b4bb1fb 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -55,15 +55,13 @@ public:
is_persistent(is_presistent_),
use_disposable_cache(use_disposable_cache_),
read_segment_index(read_segment_index_),
- file_cache_stats(stats_),
- enable_file_cache(enable_file_cache) {}
+ file_cache_stats(stats_) {}
ReaderType reader_type;
const TUniqueId* query_id = nullptr;
bool is_persistent = false;
bool use_disposable_cache = false;
bool read_segment_index = false;
FileCacheStatistics* file_cache_stats = nullptr;
- bool enable_file_cache = true;
};
} // namespace io
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index f11a5c2273..7f114167ea 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -142,8 +142,10 @@ Status CsvReader::init_reader(bool is_load) {
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &csv_file_reader));
} else {
- RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _system_properties, _file_description, &_file_system, &csv_file_reader));
+ io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
+ _file_description, &_file_system,
+ &_file_reader, cache_policy));
}
if (typeid_cast<io::S3FileReader*>(csv_file_reader.get()) != nullptr ||
typeid_cast<io::BrokerFileReader*>(csv_file_reader.get()) != nullptr) {
@@ -634,9 +636,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
}
_file_description.start_offset = start_offset;
-
+ io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description,
- &_file_system, &_file_reader));
+ &_file_system, &_file_reader, cache_policy));
if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
_params.file_type != TFileType::FILE_BROKER) {
return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path);
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index 6ae0c74e85..cea79dad75 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -339,8 +339,10 @@ Status NewJsonReader::_open_file_reader() {
if (_params.file_type == TFileType::FILE_STREAM) {
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &json_file_reader));
} else {
- RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _system_properties, _file_description, &_file_system, &json_file_reader));
+ io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
+ _file_description, &_file_system,
+ &_file_reader, cache_policy));
}
if (typeid_cast<io::S3FileReader*>(json_file_reader.get()) != nullptr ||
typeid_cast<io::BrokerFileReader*>(json_file_reader.get()) != nullptr) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 293e1233d1..c4fd3c2409 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -72,10 +72,12 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
}
}
-OrcReader::OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, const std::vector<std::string>& column_names,
- size_t batch_size, const std::string& ctz, io::IOContext* io_ctx)
+OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params, const TFileRangeDesc& range,
+ const std::vector<std::string>& column_names, size_t batch_size,
+ const std::string& ctz, io::IOContext* io_ctx)
: _profile(profile),
+ _state(state),
_scan_params(params),
_scan_range(range),
_batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
@@ -153,8 +155,10 @@ void OrcReader::_init_profile() {
Status OrcReader::_create_file_reader() {
if (_file_input_stream == nullptr) {
io::FileReaderSPtr inner_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _system_properties, _file_description, &_file_system, &inner_reader));
+ io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
+ _file_description, &_file_system,
+ &inner_reader, cache_policy));
_file_input_stream.reset(
new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx));
}
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 4fc2fd5ec1..5430922833 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -45,7 +45,7 @@ public:
int64_t decode_null_map_time = 0;
};
- OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, const std::vector<std::string>& column_names,
size_t batch_size, const std::string& ctz, io::IOContext* io_ctx);
@@ -248,6 +248,7 @@ private:
private:
RuntimeProfile* _profile;
+ RuntimeState* _state;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
FileSystemProperties _system_properties;
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index cccbe0f9c2..98b02609db 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -35,12 +35,13 @@ namespace doris::vectorized {
constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
-static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_metadata) {
+static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_metadata,
+ size_t* meta_size, io::IOContext* io_ctx) {
uint8_t footer[PARQUET_FOOTER_SIZE];
int64_t file_size = file->size();
size_t bytes_read = 0;
Slice result(footer, PARQUET_FOOTER_SIZE);
- RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result, &bytes_read));
+ RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result, &bytes_read, io_ctx));
DCHECK_EQ(bytes_read, PARQUET_FOOTER_SIZE);
// validate magic
@@ -60,12 +61,13 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_m
// deserialize footer
std::unique_ptr<uint8_t[]> meta_buff(new uint8_t[metadata_size]);
Slice res(meta_buff.get(), metadata_size);
- RETURN_IF_ERROR(
- file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, res, &bytes_read));
+ RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, res, &bytes_read,
+ io_ctx));
DCHECK_EQ(bytes_read, metadata_size);
RETURN_IF_ERROR(deserialize_thrift_msg(meta_buff.get(), &metadata_size, true, &t_metadata));
*file_metadata = new FileMetaData(t_metadata);
RETURN_IF_ERROR((*file_metadata)->init_schema());
+ *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index b08e316c22..d0e5f25d71 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -21,20 +21,21 @@ namespace doris::vectorized {
ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader,
tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema,
- cctz::time_zone* ctz)
+ cctz::time_zone* ctz, io::IOContext* io_ctx)
: _field_schema(field_schema),
_max_rep_level(field_schema->repetition_level),
_max_def_level(field_schema->definition_level),
_stream_reader(reader),
_metadata(column_chunk->meta_data),
- _ctz(ctz) {}
+ _ctz(ctz),
+ _io_ctx(io_ctx) {}
Status ColumnChunkReader::init() {
size_t start_offset = _metadata.__isset.dictionary_page_offset
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
- _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size);
+ _page_reader = std::make_unique<PageReader>(_stream_reader, _io_ctx, start_offset, chunk_size);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec));
if (_metadata.__isset.dictionary_page_offset) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 6075275a7e..ad37e13815 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -65,7 +65,7 @@ public:
};
ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
- FieldSchema* field_schema, cctz::time_zone* ctz);
+ FieldSchema* field_schema, cctz::time_zone* ctz, io::IOContext* io_ctx);
~ColumnChunkReader() = default;
// Initialize chunk reader, will generate the decoder and codec.
@@ -175,6 +175,7 @@ private:
io::BufferedStreamReader* _stream_reader;
tparquet::ColumnMetaData _metadata;
cctz::time_zone* _ctz;
+ io::IOContext* _io_ctx;
std::unique_ptr<PageReader> _page_reader = nullptr;
BlockCompressionCodec* _block_compress_codec = nullptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 02bb4e1d70..1657f5ab3a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -97,43 +97,44 @@ static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offset
Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
const tparquet::RowGroup& row_group,
const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+ io::IOContext* io_ctx,
std::unique_ptr<ParquetColumnReader>& reader,
size_t max_buf_size) {
if (field->type.type == TYPE_ARRAY) {
std::unique_ptr<ParquetColumnReader> element_reader;
- RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz,
+ RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
element_reader, max_buf_size));
element_reader->set_nested_column();
- ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz);
+ ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
reader.reset(array_reader);
} else if (field->type.type == TYPE_MAP) {
std::unique_ptr<ParquetColumnReader> key_reader;
std::unique_ptr<ParquetColumnReader> value_reader;
RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz,
- key_reader, max_buf_size));
+ io_ctx, key_reader, max_buf_size));
RETURN_IF_ERROR(create(file, &field->children[0].children[1], row_group, row_ranges, ctz,
- value_reader, max_buf_size));
+ io_ctx, value_reader, max_buf_size));
key_reader->set_nested_column();
value_reader->set_nested_column();
- MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz);
+ MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
reader.reset(map_reader);
} else if (field->type.type == TYPE_STRUCT) {
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
for (int i = 0; i < field->children.size(); ++i) {
std::unique_ptr<ParquetColumnReader> child_reader;
- RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz,
+ RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size));
child_reader->set_nested_column();
child_readers.emplace_back(std::move(child_reader));
}
- StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz);
+ StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz, io_ctx);
RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field));
reader.reset(struct_reader);
} else {
const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index];
- ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz);
+ ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz, io_ctx);
RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size));
reader.reset(scalar_reader);
}
@@ -173,8 +174,8 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, siz
size_t chunk_len = chunk_meta.total_compressed_size;
_stream_reader = std::make_unique<io::BufferedFileStreamReader>(
file, chunk_start, chunk_len, std::min(chunk_len, max_buf_size));
- _chunk_reader =
- std::make_unique<ColumnChunkReader>(_stream_reader.get(), &_chunk_meta, field, _ctz);
+ _chunk_reader = std::make_unique<ColumnChunkReader>(_stream_reader.get(), &_chunk_meta, field,
+ _ctz, _io_ctx);
RETURN_IF_ERROR(_chunk_reader->init());
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 181f93d974..26cdbd9dd3 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -77,8 +77,9 @@ public:
}
};
- ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
- : _row_ranges(row_ranges), _ctz(ctz) {}
+ ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+ io::IOContext* io_ctx)
+ : _row_ranges(row_ranges), _ctz(ctz), _io_ctx(io_ctx) {}
virtual ~ParquetColumnReader() = default;
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
@@ -100,7 +101,8 @@ public:
static Status create(io::FileReaderSPtr file, FieldSchema* field,
const tparquet::RowGroup& row_group,
const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
- std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size);
+ io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader,
+ size_t max_buf_size);
void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; }
void set_nested_column() { _nested_column = true; }
virtual const std::vector<level_t>& get_rep_level() const = 0;
@@ -117,6 +119,7 @@ protected:
bool _nested_column = false;
const std::vector<RowRange>& _row_ranges;
cctz::time_zone* _ctz;
+ io::IOContext* _io_ctx;
tparquet::OffsetIndex* _offset_index;
int64_t _current_row_index = 0;
int _row_range_index = 0;
@@ -126,8 +129,9 @@ protected:
class ScalarColumnReader : public ParquetColumnReader {
public:
ScalarColumnReader(const std::vector<RowRange>& row_ranges,
- const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz)
- : ParquetColumnReader(row_ranges, ctz), _chunk_meta(chunk_meta) {}
+ const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz,
+ io::IOContext* io_ctx)
+ : ParquetColumnReader(row_ranges, ctz, io_ctx), _chunk_meta(chunk_meta) {}
~ScalarColumnReader() override { close(); }
Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
@@ -163,8 +167,9 @@ private:
class ArrayColumnReader : public ParquetColumnReader {
public:
- ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
- : ParquetColumnReader(row_ranges, ctz) {}
+ ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+ io::IOContext* io_ctx)
+ : ParquetColumnReader(row_ranges, ctz, io_ctx) {}
~ArrayColumnReader() override { close(); }
Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
@@ -185,8 +190,9 @@ private:
class MapColumnReader : public ParquetColumnReader {
public:
- MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
- : ParquetColumnReader(row_ranges, ctz) {}
+ MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+ io::IOContext* io_ctx)
+ : ParquetColumnReader(row_ranges, ctz, io_ctx) {}
~MapColumnReader() override { close(); }
Status init(std::unique_ptr<ParquetColumnReader> key_reader,
@@ -218,8 +224,9 @@ private:
class StructColumnReader : public ParquetColumnReader {
public:
- StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
- : ParquetColumnReader(row_ranges, ctz) {}
+ StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+ io::IOContext* io_ctx)
+ : ParquetColumnReader(row_ranges, ctz, io_ctx) {}
~StructColumnReader() override { close(); }
Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index a25cce7171..0604106b03 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -34,7 +34,7 @@ const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>& read_columns,
const int32_t row_group_id, const tparquet::RowGroup& row_group,
- cctz::time_zone* ctz,
+ cctz::time_zone* ctz, io::IOContext* io_ctx,
const PositionDeleteContext& position_delete_ctx,
const LazyReadContext& lazy_read_ctx, RuntimeState* state)
: _file_reader(file_reader),
@@ -43,6 +43,7 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
_row_group_meta(row_group),
_remaining_rows(row_group.num_rows),
_ctz(ctz),
+ _io_ctx(io_ctx),
_position_delete_ctx(position_delete_ctx),
_lazy_read_ctx(lazy_read_ctx),
_state(state),
@@ -85,7 +86,8 @@ Status RowGroupReader::init(
auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta,
- _read_ranges, _ctz, reader, max_buf_size));
+ _read_ranges, _ctz, _io_ctx, reader,
+ max_buf_size));
auto col_iter = col_offsets.find(read_col._parquet_col_id);
if (col_iter != col_offsets.end()) {
tparquet::OffsetIndex oi = col_iter->second;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 49545e1051..33f8371699 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -107,7 +107,7 @@ public:
RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id,
- const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+ const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx,
const PositionDeleteContext& position_delete_ctx,
const LazyReadContext& lazy_read_ctx, RuntimeState* state);
@@ -167,6 +167,7 @@ private:
const tparquet::RowGroup& _row_group_meta;
int64_t _remaining_rows;
cctz::time_zone* _ctz;
+ io::IOContext* _io_ctx;
PositionDeleteContext _position_delete_ctx;
// merge the row ranges generated from page index and position delete.
std::vector<RowRange> _read_ranges;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index a74b328dee..12ef8d0427 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -26,8 +26,9 @@ namespace doris::vectorized {
static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
-PageReader::PageReader(io::BufferedStreamReader* reader, uint64_t offset, uint64_t length)
- : _reader(reader), _start_offset(offset), _end_offset(offset + length) {}
+PageReader::PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset,
+ uint64_t length)
+ : _reader(reader), _io_ctx(io_ctx), _start_offset(offset), _end_offset(offset + length) {}
Status PageReader::next_page_header() {
if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
@@ -47,7 +48,7 @@ Status PageReader::next_page_header() {
uint32_t real_header_size = 0;
while (true) {
header_size = std::min(header_size, max_size);
- RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size));
+ RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
real_header_size = header_size;
SCOPED_RAW_TIMER(&_statistics.decode_header_time);
auto st =
@@ -87,7 +88,7 @@ Status PageReader::get_page_data(Slice& slice) {
} else {
slice.size = _cur_page_header.compressed_page_size;
}
- RETURN_IF_ERROR(_reader->read_bytes(slice, _offset));
+ RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
_offset += slice.size;
_state = INITIALIZED;
return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index ab42b45d6b..846ab96f36 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -32,7 +32,8 @@ public:
int64_t decode_header_time = 0;
};
- PageReader(io::BufferedStreamReader* reader, uint64_t offset, uint64_t length);
+ PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset,
+ uint64_t length);
~PageReader() = default;
bool has_next_page() const { return _offset < _end_offset; }
@@ -57,6 +58,7 @@ private:
enum PageReaderState { INITIALIZED, HEADER_PARSED };
io::BufferedStreamReader* _reader;
+ io::IOContext* _io_ctx;
tparquet::PageHeader _cur_page_header;
Statistics _statistics;
PageReaderState _state = INITIALIZED;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 50f01b6b86..1c34ea8412 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -170,23 +170,27 @@ Status ParquetReader::_open_file() {
if (_file_reader == nullptr) {
SCOPED_RAW_TIMER(&_statistics.open_file_time);
++_statistics.open_file_num;
- RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _system_properties, _file_description, &_file_system, &_file_reader));
+ io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
+ _file_description, &_file_system,
+ &_file_reader, cache_policy));
}
if (_file_metadata == nullptr) {
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
if (_file_reader->size() == 0) {
return Status::EndOfFile("open file failed, empty parquet file: " + _scan_range.path);
}
+ size_t meta_size = 0;
if (_kv_cache == nullptr) {
_is_file_metadata_owned = true;
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader, &_file_metadata));
+ RETURN_IF_ERROR(
+ parse_thrift_footer(_file_reader, &_file_metadata, &meta_size, _io_ctx));
} else {
_is_file_metadata_owned = false;
_file_metadata = _kv_cache->get<FileMetaData>(
_meta_cache_key(_file_reader->path()), [&]() -> FileMetaData* {
FileMetaData* meta;
- Status st = parse_thrift_footer(_file_reader, &meta);
+ Status st = parse_thrift_footer(_file_reader, &meta, &meta_size, _io_ctx);
if (!st) {
LOG(INFO) << "failed to parse parquet footer for "
<< _file_description.path << ", err: " << st;
@@ -200,6 +204,8 @@ Status ParquetReader::_open_file() {
return Status::InternalError("failed to get file meta data: {}",
_file_description.path);
}
+ _column_statistics.read_bytes += meta_size;
+ _column_statistics.read_calls += 2;
}
return Status::OK();
}
@@ -519,9 +525,9 @@ Status ParquetReader::_next_row_group_reader() {
RowGroupReader::PositionDeleteContext position_delete_ctx =
_get_position_delete_ctx(row_group, row_group_index);
- _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns,
- row_group_index.row_group_id, row_group, _ctz,
- position_delete_ctx, _lazy_read_ctx, _state));
+ _current_group_reader.reset(
+ new RowGroupReader(_file_reader, _read_columns, row_group_index.row_group_id, row_group,
+ _ctz, _io_ctx, position_delete_ctx, _lazy_read_ctx, _state));
_row_group_eof = false;
return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets,
_tuple_descriptor, _row_descriptor, _colname_to_slot_id,
@@ -617,12 +623,15 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
Slice result(col_index_buff, page_index._column_index_size);
RETURN_IF_ERROR(
_file_reader->read_at(page_index._column_index_start, result, &bytes_read, _io_ctx));
+ _column_statistics.read_bytes += bytes_read;
auto& schema_desc = _file_metadata->schema();
std::vector<RowRange> skipped_row_ranges;
uint8_t off_index_buff[page_index._offset_index_size];
Slice res(off_index_buff, page_index._offset_index_size);
RETURN_IF_ERROR(
_file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx));
+ _column_statistics.read_bytes += bytes_read;
+ _column_statistics.read_calls += 2;
for (auto& read_col : _read_columns) {
auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name);
if (_colname_to_value_range->end() == conjunct_iter) {
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 298fad4d11..44c567ec3d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -81,7 +81,6 @@ Status VFileScanner::prepare(
_io_ctx.reset(new io::IOContext());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->query_id = &_state->query_id();
- _io_ctx->enable_file_cache = _state->query_options().enable_file_cache;
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
@@ -566,7 +565,7 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_ORC: {
- _cur_reader.reset(new OrcReader(_profile, _params, range, _file_col_names,
+ _cur_reader.reset(new OrcReader(_profile, _state, _params, range, _file_col_names,
_state->query_options().batch_size, _state->timezone(),
_io_ctx.get()));
init_status = ((OrcReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
@@ -832,7 +831,7 @@ Status VFileScanner::close(RuntimeState* state) {
}
}
- if (config::enable_file_cache) {
+ if (config::enable_file_cache && _state->query_options().enable_file_cache) {
io::FileCacheProfileReporter cache_profile(_profile);
cache_profile.update(_file_cache_statistics.get());
}
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 388a0e78ce..9e3137f7c6 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -80,9 +80,10 @@ Status VArrowScanner::_open_next_reader() {
io::FileReaderSPtr file_reader;
_init_system_properties(range);
_init_file_description(range);
- // no use
- RETURN_IF_ERROR(FileFactory::create_file_reader(
- _profile, _system_properties, _file_description, &_file_system, &file_reader));
+ io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state);
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties,
+ _file_description, &_file_system,
+ &file_reader, cache_policy));
if (file_reader->size() == 0) {
continue;
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 033606f310..3db30dfc2b 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -55,7 +55,8 @@ TEST_F(ParquetThriftReaderTest, normal) {
EXPECT_TRUE(st.ok());
FileMetaData* meta_data;
- parse_thrift_footer(reader, &meta_data);
+ size_t meta_size;
+ parse_thrift_footer(reader, &meta_data, &meta_size, nullptr);
tparquet::FileMetaData t_metadata = meta_data->to_thrift();
LOG(WARNING) << "=====================================";
@@ -88,7 +89,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
EXPECT_TRUE(st.ok());
FileMetaData* metadata;
- parse_thrift_footer(reader, &metadata);
+ size_t meta_size;
+ parse_thrift_footer(reader, &metadata, &meta_size, nullptr);
tparquet::FileMetaData t_metadata = metadata->to_thrift();
FieldDescriptor schemaDescriptor;
schemaDescriptor.parse_from_thrift(t_metadata.schema);
@@ -164,7 +166,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
- ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema, &ctz);
+ ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema, &ctz, nullptr);
// initialize chunk reader
chunk_reader.init();
// seek to next page header
@@ -361,7 +363,8 @@ static void read_parquet_data_and_check(const std::string& parquet_file,
std::unique_ptr<vectorized::Block> block;
create_block(block);
FileMetaData* metadata;
- parse_thrift_footer(reader, &metadata);
+ size_t meta_size;
+ parse_thrift_footer(reader, &metadata, &meta_size, nullptr);
tparquet::FileMetaData t_metadata = metadata->to_thrift();
FieldDescriptor schema_descriptor;
schema_descriptor.parse_from_thrift(t_metadata.schema);
@@ -482,7 +485,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
// prepare metadata
FileMetaData* meta_data;
- parse_thrift_footer(file_reader, &meta_data);
+ size_t meta_size;
+ parse_thrift_footer(file_reader, &meta_data, &meta_size, nullptr);
tparquet::FileMetaData t_metadata = meta_data->to_thrift();
cctz::time_zone ctz;
@@ -491,7 +495,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
std::shared_ptr<RowGroupReader> row_group_reader;
RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0);
row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0, row_group, &ctz,
- position_delete_ctx, lazy_read_ctx, nullptr));
+ nullptr, position_delete_ctx, lazy_read_ctx,
+ nullptr));
std::vector<RowRange> row_ranges;
row_ranges.emplace_back(0, row_group.num_rows);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org