You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/02 07:11:54 UTC
[doris] branch master updated: [feature-wip](parquet-reader) add detail profile for parquet reader (#13095)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 026ffaf10d [feature-wip](parquet-reader) add detail profile for parquet reader (#13095)
026ffaf10d is described below
commit 026ffaf10db3069270568c051eff0474ed8d4b0c
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Sun Oct 2 15:11:48 2022 +0800
[feature-wip](parquet-reader) add detail profile for parquet reader (#13095)
Add more detail profile for ParquetReader:
ParquetColumnReadTime: the total time of reading parquet columns
ParquetDecodeDictTime: time to parse dictionary page
ParquetDecodeHeaderTime: time to parse page header
ParquetDecodeLevelTime: time to parse page's definition/repetition level
ParquetDecodeValueTime: time to decode page data into doris column
ParquetDecompressCount: counter of decompressing page data
ParquetDecompressTime: time to decompress page data
ParquetParseMetaTime: time to parse parquet meta data
---
be/src/io/buffered_reader.cpp | 6 ++
be/src/io/buffered_reader.h | 10 ++
be/src/io/hdfs_file_reader.cpp | 21 +++--
be/src/vec/exec/format/parquet/parquet_common.h | 16 ++++
.../parquet/vparquet_column_chunk_reader.cpp | 10 ++
.../format/parquet/vparquet_column_chunk_reader.h | 19 +++-
.../exec/format/parquet/vparquet_column_reader.cpp | 2 +-
.../exec/format/parquet/vparquet_column_reader.h | 53 ++++++++++-
.../exec/format/parquet/vparquet_group_reader.cpp | 19 ++--
.../exec/format/parquet/vparquet_group_reader.h | 10 +-
.../vec/exec/format/parquet/vparquet_page_index.h | 5 +-
.../exec/format/parquet/vparquet_page_reader.cpp | 1 +
.../vec/exec/format/parquet/vparquet_page_reader.h | 7 ++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 103 +++++++++++++++------
be/src/vec/exec/format/parquet/vparquet_reader.h | 98 +++++++++-----------
be/src/vec/exec/scan/vfile_scanner.cpp | 23 +++--
be/test/vec/exec/parquet/parquet_reader_test.cpp | 47 +++++-----
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 1 +
18 files changed, 301 insertions(+), 150 deletions(-)
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index c98b365235..021f0b9d23 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -221,15 +221,21 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset);
int64_t has_read = 0;
+ SCOPED_RAW_TIMER(&_statistics.read_time);
while (has_read < to_read) {
int64_t loop_read = 0;
RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - has_read, &loop_read,
_buf.get() + buf_remaining + has_read));
+ _statistics.read_calls++;
+ if (loop_read <= 0) {
+ break;
+ }
has_read += loop_read;
}
if (has_read != to_read) {
return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read);
}
+ _statistics.read_bytes += to_read;
_buf_end_offset += to_read;
*buf = _buf.get();
return Status::OK();
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index 97ec01cc7d..abcf24916e 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -87,6 +87,12 @@ private:
*/
class BufferedStreamReader {
public:
+ struct Statistics {
+ int64_t read_time = 0;
+ int64_t read_calls = 0;
+ int64_t read_bytes = 0;
+ };
+
/**
* Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read)
* @param buf the buffer address to save the start address of data
@@ -98,7 +104,11 @@ public:
* Save the data address to slice.data, and the slice.size is the bytes to read.
*/
virtual Status read_bytes(Slice& slice, uint64_t offset) = 0;
+ Statistics& statistics() { return _statistics; }
virtual ~BufferedStreamReader() = default;
+
+protected:
+ Statistics _statistics;
};
class BufferedFileStreamReader : public BufferedStreamReader {
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
index 37b2d73bba..de5d7ee8c7 100644
--- a/be/src/io/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -144,13 +144,22 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
seek(position);
}
- *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
- if (*bytes_read < 0) {
- return Status::InternalError(
- "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
- BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());
+ int64_t has_read = 0;
+ char* cast_out = reinterpret_cast<char*>(out);
+ while (has_read < nbytes) {
+ int64_t loop_read = hdfsRead(_hdfs_fs, _hdfs_file, cast_out + has_read, nbytes - has_read);
+ if (loop_read < 0) {
+ return Status::InternalError(
+ "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
+ BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());
+ }
+ if (loop_read == 0) {
+ break;
+ }
+ has_read += loop_read;
}
- _current_offset += *bytes_read; // save offset with file
+ *bytes_read = has_read;
+ _current_offset += has_read; // save offset with file
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index a56fdb6476..e08027a137 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -38,6 +38,22 @@ namespace doris::vectorized {
using level_t = int16_t;
+struct RowRange {
+ RowRange() {}
+ RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
+
+ int64_t first_row;
+ int64_t last_row;
+};
+
+struct ParquetReadColumn {
+ ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
+ : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {};
+
+ int _parquet_col_id;
+ const std::string& _file_slot_name;
+};
+
struct ParquetInt96 {
uint64_t lo; // time of nanoseconds in a day
uint32_t hi; // days from julian epoch
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 193200f28d..fc8b8cfdee 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -86,6 +86,8 @@ Status ColumnChunkReader::load_page_data() {
// check decompressed buffer size
_reserve_decompress_buf(uncompressed_size);
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
+ SCOPED_RAW_TIMER(&_statistics.decompress_time);
+ _statistics.decompress_cnt++;
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
} else {
RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
@@ -93,11 +95,13 @@ Status ColumnChunkReader::load_page_data() {
// Initialize repetition level and definition level. Skip when level = 0, which means required field.
if (_max_rep_level > 0) {
+ SCOPED_RAW_TIMER(&_statistics.decode_level_time);
RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
header.data_page_header.repetition_level_encoding,
_max_rep_level, _remaining_num_values));
}
if (_max_def_level > 0) {
+ SCOPED_RAW_TIMER(&_statistics.decode_level_time);
RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
header.data_page_header.definition_level_encoding,
_max_def_level, _remaining_num_values));
@@ -132,6 +136,7 @@ Status ColumnChunkReader::load_page_data() {
Status ColumnChunkReader::_decode_dict_page() {
const tparquet::PageHeader& header = *_page_reader->get_page_header();
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
+ SCOPED_RAW_TIMER(&_statistics.decode_dict_time);
// Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification.
// Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files.
@@ -187,6 +192,7 @@ Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) {
}
_remaining_num_values -= num_values;
if (skip_data) {
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
return _page_decoder->skip_values(num_values);
} else {
return Status::OK();
@@ -194,6 +200,7 @@ Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) {
}
void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_values) {
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
DCHECK_GE(_remaining_num_values, num_values);
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
@@ -206,6 +213,7 @@ void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_v
}
void ColumnChunkReader::insert_null_values(MutableColumnPtr& doris_column, size_t num_values) {
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
for (int i = 0; i < num_values; ++i) {
doris_column->insert_default();
}
@@ -227,6 +235,7 @@ Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& da
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
_remaining_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
@@ -236,6 +245,7 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataType
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
+ SCOPED_RAW_TIMER(&_statistics.decode_value_time);
_remaining_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 44f5b56ff2..1599762152 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -29,8 +29,6 @@
#include "parquet_common.h"
#include "schema_desc.h"
#include "util/block_compression.h"
-#include "vec/columns/column_array.h"
-#include "vec/columns/column_nullable.h"
#include "vparquet_page_reader.h"
namespace doris::vectorized {
@@ -57,6 +55,15 @@ namespace doris::vectorized {
*/
class ColumnChunkReader {
public:
+ struct Statistics {
+ int64_t decompress_time = 0;
+ int64_t decompress_cnt = 0;
+ int64_t decode_header_time = 0;
+ int64_t decode_value_time = 0;
+ int64_t decode_dict_time = 0;
+ int64_t decode_level_time = 0;
+ };
+
ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, cctz::time_zone* ctz);
~ColumnChunkReader() = default;
@@ -96,7 +103,7 @@ public:
// Load page data into the underlying container,
// and initialize the repetition and definition level decoder for current page data.
Status load_page_data();
- Status load_page_date_idempotent() {
+ Status load_page_data_idempotent() {
if (_state == DATA_LOADED) {
return Status::OK();
}
@@ -131,6 +138,11 @@ public:
// Get page decoder
Decoder* get_page_decoder() { return _page_decoder; }
+ Statistics& statistics() {
+ _statistics.decode_header_time = _page_reader->statistics().decode_header_time;
+ return _statistics;
+ }
+
private:
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED };
@@ -161,6 +173,7 @@ private:
// Map: encoding -> Decoder
// Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding
std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
+ Statistics _statistics;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 3074705ffa..1bcb640865 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -176,7 +176,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
*read_rows = 0;
} else {
// load page data to decode or skip values
- RETURN_IF_ERROR(_chunk_reader->load_page_date_idempotent());
+ RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
size_t has_read = 0;
for (auto& range : read_ranges) {
// generate the skipped values
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index b245ff6aa4..0a5d51e2e0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -21,13 +21,9 @@
#include "schema_desc.h"
#include "vparquet_column_chunk_reader.h"
-#include "vparquet_reader.h"
namespace doris::vectorized {
-struct RowRange;
-class ParquetReadColumn;
-
class ParquetColumnMetadata {
public:
ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length,
@@ -49,6 +45,52 @@ private:
class ParquetColumnReader {
public:
+ struct Statistics {
+ Statistics()
+ : read_time(0),
+ read_calls(0),
+ read_bytes(0),
+ decompress_time(0),
+ decompress_cnt(0),
+ decode_header_time(0),
+ decode_value_time(0),
+ decode_dict_time(0),
+ decode_level_time(0) {}
+
+ Statistics(BufferedStreamReader::Statistics& fs, ColumnChunkReader::Statistics& cs)
+ : read_time(fs.read_time),
+ read_calls(fs.read_calls),
+ read_bytes(fs.read_bytes),
+ decompress_time(cs.decompress_time),
+ decompress_cnt(cs.decompress_cnt),
+ decode_header_time(cs.decode_header_time),
+ decode_value_time(cs.decode_value_time),
+ decode_dict_time(cs.decode_dict_time),
+ decode_level_time(cs.decode_level_time) {}
+
+ int64_t read_time;
+ int64_t read_calls;
+ int64_t read_bytes;
+ int64_t decompress_time;
+ int64_t decompress_cnt;
+ int64_t decode_header_time;
+ int64_t decode_value_time;
+ int64_t decode_dict_time;
+ int64_t decode_level_time;
+
+ void merge(Statistics& statistics) {
+ read_time += statistics.read_time;
+ read_calls += statistics.read_calls;
+ read_bytes += statistics.read_bytes;
+ decompress_time += statistics.decompress_time;
+ decompress_cnt += statistics.decompress_cnt;
+ decode_header_time += statistics.decode_header_time;
+ decode_value_time += statistics.decode_value_time;
+ decode_dict_time += statistics.decode_dict_time;
+ decode_level_time += statistics.decode_level_time;
+ }
+ };
+
ParquetColumnReader(cctz::time_zone* ctz) : _ctz(ctz) {};
virtual ~ParquetColumnReader() {
if (_stream_reader != nullptr) {
@@ -64,6 +106,9 @@ public:
size_t max_buf_size);
void init_column_metadata(const tparquet::ColumnChunk& chunk);
void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; }
+ Statistics statistics() {
+ return Statistics(_stream_reader->statistics(), _chunk_reader->statistics());
+ }
virtual void close() = 0;
protected:
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0168a97a43..ddcc6494d0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -38,14 +38,6 @@ RowGroupReader::~RowGroupReader() {
Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
- VLOG_DEBUG << "Row group id: " << _row_group_id;
- RETURN_IF_ERROR(_init_column_readers(schema, row_ranges, col_offsets));
- return Status::OK();
-}
-
-Status RowGroupReader::_init_column_readers(
- const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
- std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
@@ -60,7 +52,7 @@ Status RowGroupReader::_init_column_readers(
reader->add_offset_index(&oi);
}
if (reader == nullptr) {
- VLOG_DEBUG << "Init row group reader failed";
+ VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
return Status::Corruption("Init row group reader failed");
}
_column_readers[read_col._file_slot_name] = std::move(reader);
@@ -100,4 +92,13 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_
return Status::OK();
}
+ParquetColumnReader::Statistics RowGroupReader::statistics() {
+ ParquetColumnReader::Statistics st;
+ for (auto& reader : _column_readers) {
+ auto ost = reader.second->statistics();
+ st.merge(ost);
+ }
+ return st;
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 2e72d42805..27daffe6f7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -17,17 +17,11 @@
#pragma once
#include <common/status.h>
-#include "exprs/expr_context.h"
#include "io/file_reader.h"
#include "vec/core/block.h"
#include "vparquet_column_reader.h"
-#include "vparquet_file_metadata.h"
-#include "vparquet_reader.h"
namespace doris::vectorized {
-class ParquetReadColumn;
-class ParquetColumnReader;
-struct RowRange;
class RowGroupReader {
public:
@@ -39,9 +33,7 @@ public:
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
-private:
- Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
- std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
+ ParquetColumnReader::Statistics statistics();
private:
doris::FileReader* _file_reader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 2f4b0974b8..cfbe97ded4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -19,11 +19,10 @@
#include <common/status.h>
#include <gen_cpp/parquet_types.h>
-#include "vparquet_reader.h"
+#include "exec/olap_common.h"
+#include "parquet_common.h"
namespace doris::vectorized {
-class ParquetReader;
-struct RowRange;
class PageIndex {
public:
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index baa88036c2..00e2ef0926 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -48,6 +48,7 @@ Status PageReader::next_page_header() {
header_size = std::min(header_size, max_size);
RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size));
real_header_size = header_size;
+ SCOPED_RAW_TIMER(&_statistics.decode_header_time);
auto st =
deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
if (st.ok()) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 0d83c81650..5563f97409 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -28,6 +28,10 @@ namespace doris::vectorized {
*/
class PageReader {
public:
+ struct Statistics {
+ int64_t decode_header_time;
+ };
+
PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length);
~PageReader() = default;
@@ -41,6 +45,8 @@ public:
Status get_page_data(Slice& slice);
+ Statistics& statistics() { return _statistics; }
+
void seek_to_page(int64_t page_header_offset) {
_offset = page_header_offset;
_next_header_offset = page_header_offset;
@@ -52,6 +58,7 @@ private:
BufferedStreamReader* _reader;
tparquet::PageHeader _cur_page_header;
+ Statistics _statistics;
PageReaderState _state = INITIALIZED;
uint64_t _offset = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 85e19425c0..6b8a01c83c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -24,47 +24,79 @@
#include "parquet_thrift_util.h"
namespace doris::vectorized {
-ParquetReader::ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
- const TFileScanRangeParams& params, const TFileRangeDesc& range,
+
+ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range,
const std::vector<std::string>& column_names, size_t batch_size,
cctz::time_zone* ctz)
: _profile(profile),
- _file_reader(file_reader),
- // _scan_params(params),
- // _scan_range(range),
+ _scan_params(params),
+ _scan_range(range),
_batch_size(batch_size),
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
_column_names(column_names) {
- if (profile != nullptr) {
- _filtered_row_groups = ADD_COUNTER(profile, "ParquetFilteredGroups", TUnit::UNIT);
- _to_read_row_groups = ADD_COUNTER(profile, "ParquetReadGroups", TUnit::UNIT);
- _filtered_group_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByGroup", TUnit::UNIT);
- _filtered_page_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByPage", TUnit::UNIT);
- _filtered_bytes = ADD_COUNTER(profile, "ParquetFilteredBytes", TUnit::BYTES);
- _to_read_bytes = ADD_COUNTER(profile, "ParquetReadBytes", TUnit::BYTES);
- }
+ _init_profile();
}
ParquetReader::~ParquetReader() {
close();
}
+void ParquetReader::_init_profile() {
+ if (_profile != nullptr) {
+ _parquet_profile.filtered_row_groups =
+ ADD_COUNTER(_profile, "ParquetFilteredGroups", TUnit::UNIT);
+ _parquet_profile.to_read_row_groups =
+ ADD_COUNTER(_profile, "ParquetReadGroups", TUnit::UNIT);
+ _parquet_profile.filtered_group_rows =
+ ADD_COUNTER(_profile, "ParquetFilteredRowsByGroup", TUnit::UNIT);
+ _parquet_profile.filtered_page_rows =
+ ADD_COUNTER(_profile, "ParquetFilteredRowsByPage", TUnit::UNIT);
+ _parquet_profile.filtered_bytes =
+ ADD_COUNTER(_profile, "ParquetFilteredBytes", TUnit::BYTES);
+ _parquet_profile.to_read_bytes = ADD_COUNTER(_profile, "ParquetReadBytes", TUnit::BYTES);
+ _parquet_profile.column_read_time = ADD_TIMER(_profile, "ParquetColumnReadTime");
+ _parquet_profile.parse_meta_time = ADD_TIMER(_profile, "ParquetParseMetaTime");
+
+ _parquet_profile.file_read_time = ADD_TIMER(_profile, "FileReadTime");
+ _parquet_profile.file_read_calls = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
+ _parquet_profile.file_read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES);
+ _parquet_profile.decompress_time = ADD_TIMER(_profile, "ParquetDecompressTime");
+ _parquet_profile.decompress_cnt =
+ ADD_COUNTER(_profile, "ParquetDecompressCount", TUnit::UNIT);
+ _parquet_profile.decode_header_time = ADD_TIMER(_profile, "ParquetDecodeHeaderTime");
+ _parquet_profile.decode_value_time = ADD_TIMER(_profile, "ParquetDecodeValueTime");
+ _parquet_profile.decode_dict_time = ADD_TIMER(_profile, "ParquetDecodeDictTime");
+ _parquet_profile.decode_level_time = ADD_TIMER(_profile, "ParquetDecodeLevelTime");
+ }
+}
+
void ParquetReader::close() {
if (!_closed) {
- if (_file_reader != nullptr) {
- _file_reader->close();
- delete _file_reader;
- }
-
if (_profile != nullptr) {
- COUNTER_UPDATE(_filtered_row_groups, _statistics.filtered_row_groups);
- COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups);
- COUNTER_UPDATE(_filtered_group_rows, _statistics.filtered_group_rows);
- COUNTER_UPDATE(_filtered_page_rows, _statistics.filtered_page_rows);
- COUNTER_UPDATE(_filtered_bytes, _statistics.filtered_bytes);
- COUNTER_UPDATE(_to_read_bytes, _statistics.read_bytes);
+ COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups);
+ COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups);
+ COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows);
+ COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows);
+ COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes);
+ COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes);
+ COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time);
+ COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time);
+
+ COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time);
+ COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls);
+ COUNTER_UPDATE(_parquet_profile.file_read_bytes, _column_statistics.read_bytes);
+ COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time);
+ COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt);
+ COUNTER_UPDATE(_parquet_profile.decode_header_time,
+ _column_statistics.decode_header_time);
+ COUNTER_UPDATE(_parquet_profile.decode_value_time,
+ _column_statistics.decode_value_time);
+ COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time);
+ COUNTER_UPDATE(_parquet_profile.decode_level_time,
+ _column_statistics.decode_level_time);
}
_closed = true;
}
@@ -72,8 +104,16 @@ void ParquetReader::close() {
Status ParquetReader::init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
- CHECK(_file_reader != nullptr);
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
+ SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+ if (_file_reader == nullptr) {
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range,
+ _file_reader, 0));
+ }
+ RETURN_IF_ERROR(_file_reader->open());
+ if (_file_reader->size() == 0) {
+ return Status::EndOfFile("Empty Parquet File");
+ }
+ RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
@@ -145,8 +185,13 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) {
return Status::OK();
}
bool _batch_eof = false;
- RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof));
+ {
+ SCOPED_RAW_TIMER(&_statistics.column_read_time);
+ RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof));
+ }
if (_batch_eof) {
+ auto column_st = _current_group_reader->statistics();
+ _column_statistics.merge(column_st);
if (!_next_row_group_reader()) {
*eof = true;
}
@@ -169,8 +214,8 @@ Status ParquetReader::_init_row_group_readers() {
for (auto row_group_id : _read_row_groups) {
auto& row_group = _t_metadata->row_groups[row_group_id];
std::shared_ptr<RowGroupReader> row_group_reader;
- row_group_reader.reset(
- new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz));
+ row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_id,
+ row_group, _ctz));
std::vector<RowRange> candidate_row_ranges;
RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
if (candidate_row_ranges.empty()) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 9eea2ddb61..ab44c31517 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -29,55 +29,34 @@
#include "io/file_reader.h"
#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
+#include "vparquet_column_reader.h"
#include "vparquet_file_metadata.h"
#include "vparquet_group_reader.h"
#include "vparquet_page_index.h"
namespace doris::vectorized {
-struct ParquetStatistics {
- int32_t filtered_row_groups = 0;
- int32_t read_row_groups = 0;
- int64_t filtered_group_rows = 0;
- int64_t filtered_page_rows = 0;
- int64_t read_rows = 0;
- int64_t filtered_bytes = 0;
- int64_t read_bytes = 0;
-};
-
-class RowGroupReader;
-class PageIndex;
-
-struct RowRange {
- RowRange() {}
- RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
- int64_t first_row;
- int64_t last_row;
-};
-
-class ParquetReadColumn {
-public:
- ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
- : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {};
- ~ParquetReadColumn() = default;
-
-private:
- friend class ParquetReader;
- friend class RowGroupReader;
- int _parquet_col_id;
- const std::string& _file_slot_name;
-};
-
class ParquetReader : public GenericReader {
public:
- ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
- const TFileScanRangeParams& params, const TFileRangeDesc& range,
- const std::vector<std::string>& column_names, size_t batch_size,
- cctz::time_zone* ctz);
+ struct Statistics {
+ int32_t filtered_row_groups = 0;
+ int32_t read_row_groups = 0;
+ int64_t filtered_group_rows = 0;
+ int64_t filtered_page_rows = 0;
+ int64_t read_rows = 0;
+ int64_t filtered_bytes = 0;
+ int64_t read_bytes = 0;
+ int64_t column_read_time = 0;
+ int64_t parse_meta_time = 0;
+ };
+
+ ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, const std::vector<std::string>& column_names,
+ size_t batch_size, cctz::time_zone* ctz);
virtual ~ParquetReader();
// for test
- void set_file_reader(FileReader* file_reader) { _file_reader = file_reader; }
+ void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); }
Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
@@ -92,9 +71,31 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
- ParquetStatistics& statistics() { return _statistics; }
+ Statistics& statistics() { return _statistics; }
private:
+ struct ParquetProfile {
+ RuntimeProfile::Counter* filtered_row_groups;
+ RuntimeProfile::Counter* to_read_row_groups;
+ RuntimeProfile::Counter* filtered_group_rows;
+ RuntimeProfile::Counter* filtered_page_rows;
+ RuntimeProfile::Counter* filtered_bytes;
+ RuntimeProfile::Counter* to_read_bytes;
+ RuntimeProfile::Counter* column_read_time;
+ RuntimeProfile::Counter* parse_meta_time;
+
+ RuntimeProfile::Counter* file_read_time;
+ RuntimeProfile::Counter* file_read_calls;
+ RuntimeProfile::Counter* file_read_bytes;
+ RuntimeProfile::Counter* decompress_time;
+ RuntimeProfile::Counter* decompress_cnt;
+ RuntimeProfile::Counter* decode_header_time;
+ RuntimeProfile::Counter* decode_value_time;
+ RuntimeProfile::Counter* decode_dict_time;
+ RuntimeProfile::Counter* decode_level_time;
+ };
+
+ void _init_profile();
bool _next_row_group_reader();
Status _init_read_columns();
Status _init_row_group_readers();
@@ -117,10 +118,9 @@ private:
private:
RuntimeProfile* _profile;
- // file reader is passed from file scanner, and owned by this parquet reader.
- FileReader* _file_reader = nullptr;
- // const TFileScanRangeParams& _scan_params;
- // const TFileRangeDesc& _scan_range;
+ const TFileScanRangeParams& _scan_params;
+ const TFileRangeDesc& _scan_range;
+ std::unique_ptr<FileReader> _file_reader = nullptr;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
@@ -141,15 +141,9 @@ private:
const std::vector<std::string> _column_names;
std::vector<std::string> _missing_cols;
- ParquetStatistics _statistics;
+ Statistics _statistics;
+ ParquetColumnReader::Statistics _column_statistics;
+ ParquetProfile _parquet_profile;
bool _closed = false;
-
- // parquet profile
- RuntimeProfile::Counter* _filtered_row_groups;
- RuntimeProfile::Counter* _to_read_row_groups;
- RuntimeProfile::Counter* _filtered_group_rows;
- RuntimeProfile::Counter* _filtered_page_rows;
- RuntimeProfile::Counter* _filtered_bytes;
- RuntimeProfile::Counter* _to_read_bytes;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 34b67dd1e7..f6f8127146 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -454,13 +454,17 @@ Status VFileScanner::_get_next_reader() {
const TFileRangeDesc& range = _ranges[_next_range++];
// 1. create file reader
+ // TODO: Each format requires its own FileReader to achieve a special access mode,
+ // so create the FileReader inner the format.
std::unique_ptr<FileReader> file_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
- range, file_reader));
- RETURN_IF_ERROR(file_reader->open());
- if (file_reader->size() == 0) {
- file_reader->close();
- continue;
+ if (_params.format_type != TFileFormatType::FORMAT_PARQUET) {
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
+ range, file_reader));
+ RETURN_IF_ERROR(file_reader->open());
+ if (file_reader->size() == 0) {
+ file_reader->close();
+ continue;
+ }
}
// 2. create reader for specific format
@@ -468,10 +472,9 @@ Status VFileScanner::_get_next_reader() {
Status init_status;
switch (_params.format_type) {
case TFileFormatType::FORMAT_PARQUET: {
- _cur_reader.reset(
- new ParquetReader(_profile, file_reader.release(), _params, range,
- _file_col_names, _state->query_options().batch_size,
- const_cast<cctz::time_zone*>(&_state->timezone_obj())));
+ _cur_reader.reset(new ParquetReader(
+ _profile, _params, range, _file_col_names, _state->query_options().batch_size,
+ const_cast<cctz::time_zone*>(&_state->timezone_obj())));
init_status =
((ParquetReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
break;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 42b15196b7..68a2043d66 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -91,46 +91,45 @@ TEST_F(ParquetReaderTest, normal) {
auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
LocalFileReader* reader =
new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
- reader->open();
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
- // auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
std::vector<std::string> column_names;
for (int i = 0; i < slot_descs.size(); i++) {
column_names.push_back(slot_descs[i]->col_name());
}
- // TFileScanRangeParams scan_params;
+ TFileScanRangeParams scan_params;
TFileRangeDesc scan_range;
{
scan_range.start_offset = 0;
scan_range.size = 1000;
}
- // auto p_reader =
- // new ParquetReader(nullptr, reader, scan_params, scan_range, column_names, 992, &ctz);
+ auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz);
+ p_reader->set_file_reader(reader);
RuntimeState runtime_state((TQueryGlobals()));
runtime_state.set_desc_tbl(desc_tbl);
runtime_state.init_instance_mem_tracker();
- // std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
- // p_reader->init_reader(conjunct_ctxs);
- // Block* block = new Block();
- // for (const auto& slot_desc : tuple_desc->slots()) {
- // auto data_type =
- // vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true);
- // MutableColumnPtr data_column = data_type->create_column();
- // block->insert(
- // ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
- // }
- // bool eof = false;
- // p_reader->get_next_block(block, &eof);
- // for (auto& col : block->get_columns_with_type_and_name()) {
- // ASSERT_EQ(col.column->size(), 10);
- // }
- // EXPECT_TRUE(eof);
- // delete block;
- // delete p_reader;
- delete reader;
+ std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
+ p_reader->init_reader(&colname_to_value_range);
+ Block* block = new Block();
+ for (const auto& slot_desc : tuple_desc->slots()) {
+ auto data_type =
+ vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true);
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(
+ ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+ }
+ bool eof = false;
+ p_reader->get_next_block(block, &eof);
+ for (auto& col : block->get_columns_with_type_and_name()) {
+ ASSERT_EQ(col.column->size(), 10);
+ }
+ EXPECT_TRUE(eof);
+ delete block;
+ delete p_reader;
}
+
} // namespace vectorized
} // namespace doris
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 4272954214..c18d3099d7 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -36,6 +36,7 @@
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/parquet/vparquet_column_reader.h"
#include "vec/exec/format/parquet/vparquet_file_metadata.h"
+#include "vec/exec/format/parquet/vparquet_group_reader.h"
namespace doris {
namespace vectorized {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org