You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/02 07:11:54 UTC

[doris] branch master updated: [feature-wip](parquet-reader) add detail profile for parquet reader (#13095)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 026ffaf10d [feature-wip](parquet-reader) add detail profile for parquet reader (#13095)
026ffaf10d is described below

commit 026ffaf10db3069270568c051eff0474ed8d4b0c
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Sun Oct 2 15:11:48 2022 +0800

    [feature-wip](parquet-reader) add detail profile for parquet reader (#13095)
    
    Add more detail profile for ParquetReader:
    ParquetColumnReadTime: the total time of reading parquet columns
    ParquetDecodeDictTime: time to parse dictionary page
    ParquetDecodeHeaderTime: time to parse page header
    ParquetDecodeLevelTime: time to parse page's definition/repetition level
    ParquetDecodeValueTime: time to decode page data into doris column
    ParquetDecompressCount: counter of decompressing page data
    ParquetDecompressTime: time to decompress page data
    ParquetParseMetaTime: time to parse parquet meta data
---
 be/src/io/buffered_reader.cpp                      |   6 ++
 be/src/io/buffered_reader.h                        |  10 ++
 be/src/io/hdfs_file_reader.cpp                     |  21 +++--
 be/src/vec/exec/format/parquet/parquet_common.h    |  16 ++++
 .../parquet/vparquet_column_chunk_reader.cpp       |  10 ++
 .../format/parquet/vparquet_column_chunk_reader.h  |  19 +++-
 .../exec/format/parquet/vparquet_column_reader.cpp |   2 +-
 .../exec/format/parquet/vparquet_column_reader.h   |  53 ++++++++++-
 .../exec/format/parquet/vparquet_group_reader.cpp  |  19 ++--
 .../exec/format/parquet/vparquet_group_reader.h    |  10 +-
 .../vec/exec/format/parquet/vparquet_page_index.h  |   5 +-
 .../exec/format/parquet/vparquet_page_reader.cpp   |   1 +
 .../vec/exec/format/parquet/vparquet_page_reader.h |   7 ++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 103 +++++++++++++++------
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  98 +++++++++-----------
 be/src/vec/exec/scan/vfile_scanner.cpp             |  23 +++--
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |  47 +++++-----
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |   1 +
 18 files changed, 301 insertions(+), 150 deletions(-)

diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index c98b365235..021f0b9d23 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -221,15 +221,21 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset
     int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
     int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset);
     int64_t has_read = 0;
+    SCOPED_RAW_TIMER(&_statistics.read_time);
     while (has_read < to_read) {
         int64_t loop_read = 0;
         RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - has_read, &loop_read,
                                       _buf.get() + buf_remaining + has_read));
+        _statistics.read_calls++;
+        if (loop_read <= 0) {
+            break;
+        }
         has_read += loop_read;
     }
     if (has_read != to_read) {
         return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read);
     }
+    _statistics.read_bytes += to_read;
     _buf_end_offset += to_read;
     *buf = _buf.get();
     return Status::OK();
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index 97ec01cc7d..abcf24916e 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -87,6 +87,12 @@ private:
  */
 class BufferedStreamReader {
 public:
+    struct Statistics {
+        int64_t read_time = 0;
+        int64_t read_calls = 0;
+        int64_t read_bytes = 0;
+    };
+
     /**
      * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read)
      * @param buf the buffer address to save the start address of data
@@ -98,7 +104,11 @@ public:
      * Save the data address to slice.data, and the slice.size is the bytes to read.
      */
     virtual Status read_bytes(Slice& slice, uint64_t offset) = 0;
+    Statistics& statistics() { return _statistics; }
     virtual ~BufferedStreamReader() = default;
+
+protected:
+    Statistics _statistics;
 };
 
 class BufferedFileStreamReader : public BufferedStreamReader {
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
index 37b2d73bba..de5d7ee8c7 100644
--- a/be/src/io/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -144,13 +144,22 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r
         seek(position);
     }
 
-    *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes);
-    if (*bytes_read < 0) {
-        return Status::InternalError(
-                "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
-                BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());
+    int64_t has_read = 0;
+    char* cast_out = reinterpret_cast<char*>(out);
+    while (has_read < nbytes) {
+        int64_t loop_read = hdfsRead(_hdfs_fs, _hdfs_file, cast_out + has_read, nbytes - has_read);
+        if (loop_read < 0) {
+            return Status::InternalError(
+                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}",
+                    BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError());
+        }
+        if (loop_read == 0) {
+            break;
+        }
+        has_read += loop_read;
     }
-    _current_offset += *bytes_read; // save offset with file
+    *bytes_read = has_read;
+    _current_offset += has_read; // save offset with file
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index a56fdb6476..e08027a137 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -38,6 +38,22 @@ namespace doris::vectorized {
 
 using level_t = int16_t;
 
+struct RowRange {
+    RowRange() {}
+    RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
+
+    int64_t first_row;
+    int64_t last_row;
+};
+
+struct ParquetReadColumn {
+    ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
+            : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {};
+
+    int _parquet_col_id;
+    const std::string& _file_slot_name;
+};
+
 struct ParquetInt96 {
     uint64_t lo; // time of nanoseconds in a day
     uint32_t hi; // days from julian epoch
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 193200f28d..fc8b8cfdee 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -86,6 +86,8 @@ Status ColumnChunkReader::load_page_data() {
         // check decompressed buffer size
         _reserve_decompress_buf(uncompressed_size);
         _page_data = Slice(_decompress_buf.get(), uncompressed_size);
+        SCOPED_RAW_TIMER(&_statistics.decompress_time);
+        _statistics.decompress_cnt++;
         RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
     } else {
         RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
@@ -93,11 +95,13 @@ Status ColumnChunkReader::load_page_data() {
 
     // Initialize repetition level and definition level. Skip when level = 0, which means required field.
     if (_max_rep_level > 0) {
+        SCOPED_RAW_TIMER(&_statistics.decode_level_time);
         RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
                                                 header.data_page_header.repetition_level_encoding,
                                                 _max_rep_level, _remaining_num_values));
     }
     if (_max_def_level > 0) {
+        SCOPED_RAW_TIMER(&_statistics.decode_level_time);
         RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
                                                 header.data_page_header.definition_level_encoding,
                                                 _max_def_level, _remaining_num_values));
@@ -132,6 +136,7 @@ Status ColumnChunkReader::load_page_data() {
 Status ColumnChunkReader::_decode_dict_page() {
     const tparquet::PageHeader& header = *_page_reader->get_page_header();
     DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
+    SCOPED_RAW_TIMER(&_statistics.decode_dict_time);
 
     // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification.
     // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files.
@@ -187,6 +192,7 @@ Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) {
     }
     _remaining_num_values -= num_values;
     if (skip_data) {
+        SCOPED_RAW_TIMER(&_statistics.decode_value_time);
         return _page_decoder->skip_values(num_values);
     } else {
         return Status::OK();
@@ -194,6 +200,7 @@ Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) {
 }
 
 void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_values) {
+    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
     DCHECK_GE(_remaining_num_values, num_values);
     CHECK(doris_column->is_nullable());
     auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
@@ -206,6 +213,7 @@ void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_v
 }
 
 void ColumnChunkReader::insert_null_values(MutableColumnPtr& doris_column, size_t num_values) {
+    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
     for (int i = 0; i < num_values; ++i) {
         doris_column->insert_default();
     }
@@ -227,6 +235,7 @@ Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& da
     if (UNLIKELY(_remaining_num_values < num_values)) {
         return Status::IOError("Decode too many values in current page");
     }
+    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
     _remaining_num_values -= num_values;
     return _page_decoder->decode_values(doris_column, data_type, num_values);
 }
@@ -236,6 +245,7 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataType
     if (UNLIKELY(_remaining_num_values < num_values)) {
         return Status::IOError("Decode too many values in current page");
     }
+    SCOPED_RAW_TIMER(&_statistics.decode_value_time);
     _remaining_num_values -= num_values;
     return _page_decoder->decode_values(doris_column, data_type, num_values);
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 44f5b56ff2..1599762152 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -29,8 +29,6 @@
 #include "parquet_common.h"
 #include "schema_desc.h"
 #include "util/block_compression.h"
-#include "vec/columns/column_array.h"
-#include "vec/columns/column_nullable.h"
 #include "vparquet_page_reader.h"
 
 namespace doris::vectorized {
@@ -57,6 +55,15 @@ namespace doris::vectorized {
  */
 class ColumnChunkReader {
 public:
+    struct Statistics {
+        int64_t decompress_time = 0;
+        int64_t decompress_cnt = 0;
+        int64_t decode_header_time = 0;
+        int64_t decode_value_time = 0;
+        int64_t decode_dict_time = 0;
+        int64_t decode_level_time = 0;
+    };
+
     ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
                       FieldSchema* field_schema, cctz::time_zone* ctz);
     ~ColumnChunkReader() = default;
@@ -96,7 +103,7 @@ public:
     // Load page data into the underlying container,
     // and initialize the repetition and definition level decoder for current page data.
     Status load_page_data();
-    Status load_page_date_idempotent() {
+    Status load_page_data_idempotent() {
         if (_state == DATA_LOADED) {
             return Status::OK();
         }
@@ -131,6 +138,11 @@ public:
     // Get page decoder
     Decoder* get_page_decoder() { return _page_decoder; }
 
+    Statistics& statistics() {
+        _statistics.decode_header_time = _page_reader->statistics().decode_header_time;
+        return _statistics;
+    }
+
 private:
     enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED };
 
@@ -161,6 +173,7 @@ private:
     // Map: encoding -> Decoder
     // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding
     std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
+    Statistics _statistics;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 3074705ffa..1bcb640865 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -176,7 +176,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
         *read_rows = 0;
     } else {
         // load page data to decode or skip values
-        RETURN_IF_ERROR(_chunk_reader->load_page_date_idempotent());
+        RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent());
         size_t has_read = 0;
         for (auto& range : read_ranges) {
             // generate the skipped values
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index b245ff6aa4..0a5d51e2e0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -21,13 +21,9 @@
 
 #include "schema_desc.h"
 #include "vparquet_column_chunk_reader.h"
-#include "vparquet_reader.h"
 
 namespace doris::vectorized {
 
-struct RowRange;
-class ParquetReadColumn;
-
 class ParquetColumnMetadata {
 public:
     ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length,
@@ -49,6 +45,52 @@ private:
 
 class ParquetColumnReader {
 public:
+    struct Statistics {
+        Statistics()
+                : read_time(0),
+                  read_calls(0),
+                  read_bytes(0),
+                  decompress_time(0),
+                  decompress_cnt(0),
+                  decode_header_time(0),
+                  decode_value_time(0),
+                  decode_dict_time(0),
+                  decode_level_time(0) {}
+
+        Statistics(BufferedStreamReader::Statistics& fs, ColumnChunkReader::Statistics& cs)
+                : read_time(fs.read_time),
+                  read_calls(fs.read_calls),
+                  read_bytes(fs.read_bytes),
+                  decompress_time(cs.decompress_time),
+                  decompress_cnt(cs.decompress_cnt),
+                  decode_header_time(cs.decode_header_time),
+                  decode_value_time(cs.decode_value_time),
+                  decode_dict_time(cs.decode_dict_time),
+                  decode_level_time(cs.decode_level_time) {}
+
+        int64_t read_time;
+        int64_t read_calls;
+        int64_t read_bytes;
+        int64_t decompress_time;
+        int64_t decompress_cnt;
+        int64_t decode_header_time;
+        int64_t decode_value_time;
+        int64_t decode_dict_time;
+        int64_t decode_level_time;
+
+        void merge(Statistics& statistics) {
+            read_time += statistics.read_time;
+            read_calls += statistics.read_calls;
+            read_bytes += statistics.read_bytes;
+            decompress_time += statistics.decompress_time;
+            decompress_cnt += statistics.decompress_cnt;
+            decode_header_time += statistics.decode_header_time;
+            decode_value_time += statistics.decode_value_time;
+            decode_dict_time += statistics.decode_dict_time;
+            decode_level_time += statistics.decode_level_time;
+        }
+    };
+
     ParquetColumnReader(cctz::time_zone* ctz) : _ctz(ctz) {};
     virtual ~ParquetColumnReader() {
         if (_stream_reader != nullptr) {
@@ -64,6 +106,9 @@ public:
                          size_t max_buf_size);
     void init_column_metadata(const tparquet::ColumnChunk& chunk);
     void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; }
+    Statistics statistics() {
+        return Statistics(_stream_reader->statistics(), _chunk_reader->statistics());
+    }
     virtual void close() = 0;
 
 protected:
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0168a97a43..ddcc6494d0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -38,14 +38,6 @@ RowGroupReader::~RowGroupReader() {
 
 Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
                             std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
-    VLOG_DEBUG << "Row group id: " << _row_group_id;
-    RETURN_IF_ERROR(_init_column_readers(schema, row_ranges, col_offsets));
-    return Status::OK();
-}
-
-Status RowGroupReader::_init_column_readers(
-        const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
-        std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
     const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
     const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
     size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
@@ -60,7 +52,7 @@ Status RowGroupReader::_init_column_readers(
             reader->add_offset_index(&oi);
         }
         if (reader == nullptr) {
-            VLOG_DEBUG << "Init row group reader failed";
+            VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
             return Status::Corruption("Init row group reader failed");
         }
         _column_readers[read_col._file_slot_name] = std::move(reader);
@@ -100,4 +92,13 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_
     return Status::OK();
 }
 
+ParquetColumnReader::Statistics RowGroupReader::statistics() {
+    ParquetColumnReader::Statistics st;
+    for (auto& reader : _column_readers) {
+        auto ost = reader.second->statistics();
+        st.merge(ost);
+    }
+    return st;
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 2e72d42805..27daffe6f7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -17,17 +17,11 @@
 #pragma once
 #include <common/status.h>
 
-#include "exprs/expr_context.h"
 #include "io/file_reader.h"
 #include "vec/core/block.h"
 #include "vparquet_column_reader.h"
-#include "vparquet_file_metadata.h"
-#include "vparquet_reader.h"
 
 namespace doris::vectorized {
-class ParquetReadColumn;
-class ParquetColumnReader;
-struct RowRange;
 
 class RowGroupReader {
 public:
@@ -39,9 +33,7 @@ public:
                 std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
     Status next_batch(Block* block, size_t batch_size, bool* _batch_eof);
 
-private:
-    Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
-                                std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
+    ParquetColumnReader::Statistics statistics();
 
 private:
     doris::FileReader* _file_reader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 2f4b0974b8..cfbe97ded4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -19,11 +19,10 @@
 #include <common/status.h>
 #include <gen_cpp/parquet_types.h>
 
-#include "vparquet_reader.h"
+#include "exec/olap_common.h"
+#include "parquet_common.h"
 
 namespace doris::vectorized {
-class ParquetReader;
-struct RowRange;
 
 class PageIndex {
 public:
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
index baa88036c2..00e2ef0926 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp
@@ -48,6 +48,7 @@ Status PageReader::next_page_header() {
         header_size = std::min(header_size, max_size);
         RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size));
         real_header_size = header_size;
+        SCOPED_RAW_TIMER(&_statistics.decode_header_time);
         auto st =
                 deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
         if (st.ok()) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
index 0d83c81650..5563f97409 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h
@@ -28,6 +28,10 @@ namespace doris::vectorized {
  */
 class PageReader {
 public:
+    struct Statistics {
+        int64_t decode_header_time;
+    };
+
     PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length);
     ~PageReader() = default;
 
@@ -41,6 +45,8 @@ public:
 
     Status get_page_data(Slice& slice);
 
+    Statistics& statistics() { return _statistics; }
+
     void seek_to_page(int64_t page_header_offset) {
         _offset = page_header_offset;
         _next_header_offset = page_header_offset;
@@ -52,6 +58,7 @@ private:
 
     BufferedStreamReader* _reader;
     tparquet::PageHeader _cur_page_header;
+    Statistics _statistics;
     PageReaderState _state = INITIALIZED;
 
     uint64_t _offset = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 85e19425c0..6b8a01c83c 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -24,47 +24,79 @@
 #include "parquet_thrift_util.h"
 
 namespace doris::vectorized {
-ParquetReader::ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
-                             const TFileScanRangeParams& params, const TFileRangeDesc& range,
+
+ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+                             const TFileRangeDesc& range,
                              const std::vector<std::string>& column_names, size_t batch_size,
                              cctz::time_zone* ctz)
         : _profile(profile),
-          _file_reader(file_reader),
-          //  _scan_params(params),
-          //  _scan_range(range),
+          _scan_params(params),
+          _scan_range(range),
           _batch_size(batch_size),
           _range_start_offset(range.start_offset),
           _range_size(range.size),
           _ctz(ctz),
           _column_names(column_names) {
-    if (profile != nullptr) {
-        _filtered_row_groups = ADD_COUNTER(profile, "ParquetFilteredGroups", TUnit::UNIT);
-        _to_read_row_groups = ADD_COUNTER(profile, "ParquetReadGroups", TUnit::UNIT);
-        _filtered_group_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByGroup", TUnit::UNIT);
-        _filtered_page_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByPage", TUnit::UNIT);
-        _filtered_bytes = ADD_COUNTER(profile, "ParquetFilteredBytes", TUnit::BYTES);
-        _to_read_bytes = ADD_COUNTER(profile, "ParquetReadBytes", TUnit::BYTES);
-    }
+    _init_profile();
 }
 
 ParquetReader::~ParquetReader() {
     close();
 }
 
+void ParquetReader::_init_profile() {
+    if (_profile != nullptr) {
+        _parquet_profile.filtered_row_groups =
+                ADD_COUNTER(_profile, "ParquetFilteredGroups", TUnit::UNIT);
+        _parquet_profile.to_read_row_groups =
+                ADD_COUNTER(_profile, "ParquetReadGroups", TUnit::UNIT);
+        _parquet_profile.filtered_group_rows =
+                ADD_COUNTER(_profile, "ParquetFilteredRowsByGroup", TUnit::UNIT);
+        _parquet_profile.filtered_page_rows =
+                ADD_COUNTER(_profile, "ParquetFilteredRowsByPage", TUnit::UNIT);
+        _parquet_profile.filtered_bytes =
+                ADD_COUNTER(_profile, "ParquetFilteredBytes", TUnit::BYTES);
+        _parquet_profile.to_read_bytes = ADD_COUNTER(_profile, "ParquetReadBytes", TUnit::BYTES);
+        _parquet_profile.column_read_time = ADD_TIMER(_profile, "ParquetColumnReadTime");
+        _parquet_profile.parse_meta_time = ADD_TIMER(_profile, "ParquetParseMetaTime");
+
+        _parquet_profile.file_read_time = ADD_TIMER(_profile, "FileReadTime");
+        _parquet_profile.file_read_calls = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
+        _parquet_profile.file_read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES);
+        _parquet_profile.decompress_time = ADD_TIMER(_profile, "ParquetDecompressTime");
+        _parquet_profile.decompress_cnt =
+                ADD_COUNTER(_profile, "ParquetDecompressCount", TUnit::UNIT);
+        _parquet_profile.decode_header_time = ADD_TIMER(_profile, "ParquetDecodeHeaderTime");
+        _parquet_profile.decode_value_time = ADD_TIMER(_profile, "ParquetDecodeValueTime");
+        _parquet_profile.decode_dict_time = ADD_TIMER(_profile, "ParquetDecodeDictTime");
+        _parquet_profile.decode_level_time = ADD_TIMER(_profile, "ParquetDecodeLevelTime");
+    }
+}
+
 void ParquetReader::close() {
     if (!_closed) {
-        if (_file_reader != nullptr) {
-            _file_reader->close();
-            delete _file_reader;
-        }
-
         if (_profile != nullptr) {
-            COUNTER_UPDATE(_filtered_row_groups, _statistics.filtered_row_groups);
-            COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups);
-            COUNTER_UPDATE(_filtered_group_rows, _statistics.filtered_group_rows);
-            COUNTER_UPDATE(_filtered_page_rows, _statistics.filtered_page_rows);
-            COUNTER_UPDATE(_filtered_bytes, _statistics.filtered_bytes);
-            COUNTER_UPDATE(_to_read_bytes, _statistics.read_bytes);
+            COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups);
+            COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups);
+            COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows);
+            COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows);
+            COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes);
+            COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes);
+            COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time);
+            COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time);
+
+            COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time);
+            COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls);
+            COUNTER_UPDATE(_parquet_profile.file_read_bytes, _column_statistics.read_bytes);
+            COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time);
+            COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt);
+            COUNTER_UPDATE(_parquet_profile.decode_header_time,
+                           _column_statistics.decode_header_time);
+            COUNTER_UPDATE(_parquet_profile.decode_value_time,
+                           _column_statistics.decode_value_time);
+            COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time);
+            COUNTER_UPDATE(_parquet_profile.decode_level_time,
+                           _column_statistics.decode_level_time);
         }
         _closed = true;
     }
@@ -72,8 +104,16 @@ void ParquetReader::close() {
 
 Status ParquetReader::init_reader(
         std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
-    CHECK(_file_reader != nullptr);
-    RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
+    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+    if (_file_reader == nullptr) {
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range,
+                                                        _file_reader, 0));
+    }
+    RETURN_IF_ERROR(_file_reader->open());
+    if (_file_reader->size() == 0) {
+        return Status::EndOfFile("Empty Parquet File");
+    }
+    RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
     _t_metadata = &_file_metadata->to_thrift();
     _total_groups = _t_metadata->row_groups.size();
     if (_total_groups == 0) {
@@ -145,8 +185,13 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) {
         return Status::OK();
     }
     bool _batch_eof = false;
-    RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof));
+    {
+        SCOPED_RAW_TIMER(&_statistics.column_read_time);
+        RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof));
+    }
     if (_batch_eof) {
+        auto column_st = _current_group_reader->statistics();
+        _column_statistics.merge(column_st);
         if (!_next_row_group_reader()) {
             *eof = true;
         }
@@ -169,8 +214,8 @@ Status ParquetReader::_init_row_group_readers() {
     for (auto row_group_id : _read_row_groups) {
         auto& row_group = _t_metadata->row_groups[row_group_id];
         std::shared_ptr<RowGroupReader> row_group_reader;
-        row_group_reader.reset(
-                new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz));
+        row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_id,
+                                                  row_group, _ctz));
         std::vector<RowRange> candidate_row_ranges;
         RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
         if (candidate_row_ranges.empty()) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 9eea2ddb61..ab44c31517 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -29,55 +29,34 @@
 #include "io/file_reader.h"
 #include "vec/core/block.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vparquet_column_reader.h"
 #include "vparquet_file_metadata.h"
 #include "vparquet_group_reader.h"
 #include "vparquet_page_index.h"
 
 namespace doris::vectorized {
 
-struct ParquetStatistics {
-    int32_t filtered_row_groups = 0;
-    int32_t read_row_groups = 0;
-    int64_t filtered_group_rows = 0;
-    int64_t filtered_page_rows = 0;
-    int64_t read_rows = 0;
-    int64_t filtered_bytes = 0;
-    int64_t read_bytes = 0;
-};
-
-class RowGroupReader;
-class PageIndex;
-
-struct RowRange {
-    RowRange() {}
-    RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
-    int64_t first_row;
-    int64_t last_row;
-};
-
-class ParquetReadColumn {
-public:
-    ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
-            : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {};
-    ~ParquetReadColumn() = default;
-
-private:
-    friend class ParquetReader;
-    friend class RowGroupReader;
-    int _parquet_col_id;
-    const std::string& _file_slot_name;
-};
-
 class ParquetReader : public GenericReader {
 public:
-    ParquetReader(RuntimeProfile* profile, FileReader* file_reader,
-                  const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                  const std::vector<std::string>& column_names, size_t batch_size,
-                  cctz::time_zone* ctz);
+    struct Statistics {
+        int32_t filtered_row_groups = 0;
+        int32_t read_row_groups = 0;
+        int64_t filtered_group_rows = 0;
+        int64_t filtered_page_rows = 0;
+        int64_t read_rows = 0;
+        int64_t filtered_bytes = 0;
+        int64_t read_bytes = 0;
+        int64_t column_read_time = 0;
+        int64_t parse_meta_time = 0;
+    };
+
+    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+                  const TFileRangeDesc& range, const std::vector<std::string>& column_names,
+                  size_t batch_size, cctz::time_zone* ctz);
 
     virtual ~ParquetReader();
     // for test
-    void set_file_reader(FileReader* file_reader) { _file_reader = file_reader; }
+    void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); }
 
     Status init_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
@@ -92,9 +71,31 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
-    ParquetStatistics& statistics() { return _statistics; }
+    Statistics& statistics() { return _statistics; }
 
 private:
+    struct ParquetProfile {
+        RuntimeProfile::Counter* filtered_row_groups;
+        RuntimeProfile::Counter* to_read_row_groups;
+        RuntimeProfile::Counter* filtered_group_rows;
+        RuntimeProfile::Counter* filtered_page_rows;
+        RuntimeProfile::Counter* filtered_bytes;
+        RuntimeProfile::Counter* to_read_bytes;
+        RuntimeProfile::Counter* column_read_time;
+        RuntimeProfile::Counter* parse_meta_time;
+
+        RuntimeProfile::Counter* file_read_time;
+        RuntimeProfile::Counter* file_read_calls;
+        RuntimeProfile::Counter* file_read_bytes;
+        RuntimeProfile::Counter* decompress_time;
+        RuntimeProfile::Counter* decompress_cnt;
+        RuntimeProfile::Counter* decode_header_time;
+        RuntimeProfile::Counter* decode_value_time;
+        RuntimeProfile::Counter* decode_dict_time;
+        RuntimeProfile::Counter* decode_level_time;
+    };
+
+    void _init_profile();
     bool _next_row_group_reader();
     Status _init_read_columns();
     Status _init_row_group_readers();
@@ -117,10 +118,9 @@ private:
 
 private:
     RuntimeProfile* _profile;
-    // file reader is passed from file scanner, and owned by this parquet reader.
-    FileReader* _file_reader = nullptr;
-    //    const TFileScanRangeParams& _scan_params;
-    //    const TFileRangeDesc& _scan_range;
+    const TFileScanRangeParams& _scan_params;
+    const TFileRangeDesc& _scan_range;
+    std::unique_ptr<FileReader> _file_reader = nullptr;
 
     std::shared_ptr<FileMetaData> _file_metadata;
     const tparquet::FileMetaData* _t_metadata;
@@ -141,15 +141,9 @@ private:
     const std::vector<std::string> _column_names;
 
     std::vector<std::string> _missing_cols;
-    ParquetStatistics _statistics;
+    Statistics _statistics;
+    ParquetColumnReader::Statistics _column_statistics;
+    ParquetProfile _parquet_profile;
     bool _closed = false;
-
-    // parquet profile
-    RuntimeProfile::Counter* _filtered_row_groups;
-    RuntimeProfile::Counter* _to_read_row_groups;
-    RuntimeProfile::Counter* _filtered_group_rows;
-    RuntimeProfile::Counter* _filtered_page_rows;
-    RuntimeProfile::Counter* _filtered_bytes;
-    RuntimeProfile::Counter* _to_read_bytes;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 34b67dd1e7..f6f8127146 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -454,13 +454,17 @@ Status VFileScanner::_get_next_reader() {
         const TFileRangeDesc& range = _ranges[_next_range++];
 
         // 1. create file reader
+        // TODO: Each format requires its own FileReader to achieve a special access mode,
+        //  so create the FileReader inner the format.
         std::unique_ptr<FileReader> file_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
-                                                        range, file_reader));
-        RETURN_IF_ERROR(file_reader->open());
-        if (file_reader->size() == 0) {
-            file_reader->close();
-            continue;
+        if (_params.format_type != TFileFormatType::FORMAT_PARQUET) {
+            RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
+                                                            range, file_reader));
+            RETURN_IF_ERROR(file_reader->open());
+            if (file_reader->size() == 0) {
+                file_reader->close();
+                continue;
+            }
         }
 
         // 2. create reader for specific format
@@ -468,10 +472,9 @@ Status VFileScanner::_get_next_reader() {
         Status init_status;
         switch (_params.format_type) {
         case TFileFormatType::FORMAT_PARQUET: {
-            _cur_reader.reset(
-                    new ParquetReader(_profile, file_reader.release(), _params, range,
-                                      _file_col_names, _state->query_options().batch_size,
-                                      const_cast<cctz::time_zone*>(&_state->timezone_obj())));
+            _cur_reader.reset(new ParquetReader(
+                    _profile, _params, range, _file_col_names, _state->query_options().batch_size,
+                    const_cast<cctz::time_zone*>(&_state->timezone_obj())));
             init_status =
                     ((ParquetReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range);
             break;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 42b15196b7..68a2043d66 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -91,46 +91,45 @@ TEST_F(ParquetReaderTest, normal) {
     auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots();
     LocalFileReader* reader =
             new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
-    reader->open();
 
     cctz::time_zone ctz;
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
-    //    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
     std::vector<std::string> column_names;
     for (int i = 0; i < slot_descs.size(); i++) {
         column_names.push_back(slot_descs[i]->col_name());
     }
-    //    TFileScanRangeParams scan_params;
+    TFileScanRangeParams scan_params;
     TFileRangeDesc scan_range;
     {
         scan_range.start_offset = 0;
         scan_range.size = 1000;
     }
-    //    auto p_reader =
-    //            new ParquetReader(nullptr, reader, scan_params, scan_range, column_names, 992, &ctz);
+    auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz);
+    p_reader->set_file_reader(reader);
     RuntimeState runtime_state((TQueryGlobals()));
     runtime_state.set_desc_tbl(desc_tbl);
     runtime_state.init_instance_mem_tracker();
 
-    //    std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>();
-    // p_reader->init_reader(conjunct_ctxs);
-    //    Block* block = new Block();
-    //    for (const auto& slot_desc : tuple_desc->slots()) {
-    //        auto data_type =
-    //                vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true);
-    //        MutableColumnPtr data_column = data_type->create_column();
-    //        block->insert(
-    //                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
-    //    }
-    //    bool eof = false;
-    //    p_reader->get_next_block(block, &eof);
-    //    for (auto& col : block->get_columns_with_type_and_name()) {
-    //        ASSERT_EQ(col.column->size(), 10);
-    //    }
-    //    EXPECT_TRUE(eof);
-    //    delete block;
-    //    delete p_reader;
-    delete reader;
+    std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
+    p_reader->init_reader(&colname_to_value_range);
+    Block* block = new Block();
+    for (const auto& slot_desc : tuple_desc->slots()) {
+        auto data_type =
+                vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(
+                ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
+    }
+    bool eof = false;
+    p_reader->get_next_block(block, &eof);
+    for (auto& col : block->get_columns_with_type_and_name()) {
+        ASSERT_EQ(col.column->size(), 10);
+    }
+    EXPECT_TRUE(eof);
+    delete block;
+    delete p_reader;
 }
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 4272954214..c18d3099d7 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -36,6 +36,7 @@
 #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
 #include "vec/exec/format/parquet/vparquet_column_reader.h"
 #include "vec/exec/format/parquet/vparquet_file_metadata.h"
+#include "vec/exec/format/parquet/vparquet_group_reader.h"
 
 namespace doris {
 namespace vectorized {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org