You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/08 15:11:33 UTC

[doris] branch master updated: [feature-wip] support parquet predicate push down (#10512)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c358a43f35 [feature-wip] support parquet predicate push down (#10512)
c358a43f35 is described below

commit c358a43f3571059c9b641f3851ec84d8390fa95f
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Fri Jul 8 23:11:25 2022 +0800

    [feature-wip] support parquet predicate push down (#10512)
---
 be/src/common/config.h                         |   1 +
 be/src/exec/CMakeLists.txt                     |   1 +
 be/src/exec/arrow/arrow_reader.cpp             |   1 +
 be/src/exec/arrow/arrow_reader.h               |  15 +-
 be/src/exec/arrow/orc_reader.cpp               |   4 +-
 be/src/exec/arrow/orc_reader.h                 |   4 +-
 be/src/exec/arrow/parquet_reader.cpp           |  93 ++--
 be/src/exec/arrow/parquet_reader.h             |  14 +-
 be/src/exec/arrow/parquet_row_group_reader.cpp | 568 +++++++++++++++++++++++++
 be/src/exec/arrow/parquet_row_group_reader.h   |  89 ++++
 be/src/exec/base_scanner.cpp                   |   6 +
 be/src/exec/base_scanner.h                     |   8 +
 be/src/exec/broker_scan_node.cpp               |   1 +
 be/src/exec/parquet_scanner.cpp                |  11 +-
 be/src/exprs/expr_context.h                    |   1 +
 be/src/vec/exec/file_arrow_scanner.cpp         |   7 +-
 be/src/vec/exec/file_scan_node.cpp             |   1 +
 be/src/vec/exec/file_scanner.cpp               |   6 +
 be/src/vec/exec/file_scanner.h                 |   8 +
 be/src/vec/exec/varrow_scanner.cpp             |  22 +-
 be/src/vec/exec/varrow_scanner.h               |  10 +
 be/src/vec/exec/vbroker_scan_node.cpp          |   1 +
 be/src/vec/exec/vparquet_scanner.cpp           |   2 +-
 be/test/exec/parquet_scanner_test.cpp          |  64 ++-
 24 files changed, 889 insertions(+), 49 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2f5ee5cb71..ca0c66654d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -752,6 +752,7 @@ CONF_Int32(object_pool_buffer_size, "100");
 
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
+CONF_Bool(parquet_predicate_push_down, "false");
 
 // When the rows number reached this limit, will check the filter rate the of bloomfilter
 // if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index cc94f5fe9a..9a25789dd9 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -25,6 +25,7 @@ set(EXEC_FILES
     arrow/arrow_reader.cpp
     arrow/orc_reader.cpp
     arrow/parquet_reader.cpp
+    arrow/parquet_row_group_reader.cpp
     analytic_eval_node.cpp
     blocking_join_node.cpp
     broker_scan_node.cpp
diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp
index 94289a990b..9d20697148 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -43,6 +43,7 @@ ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size,
     _rb_reader = nullptr;
     _total_groups = 0;
     _current_group = 0;
+    _statistics = std::make_shared<Statistics>();
 }
 
 ArrowReaderWrap::~ArrowReaderWrap() {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index 5a1e00f022..ad888061be 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -33,6 +33,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "exprs/expr_context.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
@@ -48,6 +49,14 @@ class SlotDescriptor;
 class MemPool;
 class FileReader;
 
+struct Statistics {
+    int32_t filtered_row_groups = 0;
+    int32_t total_groups = 0;
+    int64_t filtered_rows = 0;
+    int64_t total_rows = 0;
+    int64_t filtered_total_bytes = 0;
+};
+
 class ArrowFile : public arrow::io::RandomAccessFile {
 public:
     ArrowFile(FileReader* file);
@@ -72,7 +81,9 @@ public:
     ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
     virtual ~ArrowReaderWrap();
 
-    virtual Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+    virtual Status init_reader(const TupleDescriptor* tuple_desc,
+                               const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                               const std::vector<ExprContext*>& conjunct_ctxs,
                                const std::string& timezone) = 0;
     // for row
     virtual Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
@@ -81,6 +92,7 @@ public:
     }
     // for vec
     virtual Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) = 0;
+    std::shared_ptr<Statistics>& statistics() { return _statistics; }
     void close();
     virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); }
 
@@ -96,6 +108,7 @@ protected:
     int _current_group;                     // current group(stripe)
     std::map<std::string, int> _map_column; // column-name <---> column-index
     std::vector<int> _include_column_ids;   // columns that need to get from file
+    std::shared_ptr<Statistics> _statistics;
 };
 
 } // namespace doris
diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp
index 536b852ad6..f94d24610c 100644
--- a/be/src/exec/arrow/orc_reader.cpp
+++ b/be/src/exec/arrow/orc_reader.cpp
@@ -34,7 +34,9 @@ ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size,
     _cur_file_eof = false;
 }
 
-Status ORCReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
+                                  const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                                  const std::vector<ExprContext*>& conjunct_ctxs,
                                   const std::string& timezone) {
     // Open ORC file reader
     auto maybe_reader =
diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h
index dd7853efe7..3b46cc6073 100644
--- a/be/src/exec/arrow/orc_reader.h
+++ b/be/src/exec/arrow/orc_reader.h
@@ -35,7 +35,9 @@ public:
     ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file);
     ~ORCReaderWrap() override = default;
 
-    Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+    Status init_reader(const TupleDescriptor* tuple_desc,
+                       const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                       const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
     Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
 
diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp
index 6885ebb781..f24768369e 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -36,13 +36,13 @@
 namespace doris {
 
 // Broker
-
-ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
-                                     int32_t num_of_columns_from_file)
+ParquetReaderWrap::ParquetReaderWrap(RuntimeProfile* profile, FileReader* file_reader,
+                                     int64_t batch_size, int32_t num_of_columns_from_file)
         : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file),
           _rows_of_group(0),
           _current_line_of_group(0),
-          _current_line_of_batch(0) {}
+          _current_line_of_batch(0),
+          _profile(profile) {}
 
 ParquetReaderWrap::~ParquetReaderWrap() {
     _closed = true;
@@ -52,7 +52,9 @@ ParquetReaderWrap::~ParquetReaderWrap() {
     }
 }
 
-Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc,
+                                      const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                                      const std::vector<ExprContext*>& conjunct_ctxs,
                                       const std::string& timezone) {
     try {
         parquet::ArrowReaderProperties arrow_reader_properties =
@@ -99,22 +101,12 @@ Status ParquetReaderWrap::init_reader(const std::vector<SlotDescriptor*>& tuple_
         _timezone = timezone;
 
         RETURN_IF_ERROR(column_indices(tuple_slot_descs));
-
-        _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
-
-        // read batch
-        RETURN_IF_ERROR(read_next_batch());
-        _current_line_of_batch = 0;
-        //save column type
-        std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
-        for (int i = 0; i < _include_column_ids.size(); i++) {
-            std::shared_ptr<arrow::Field> field = field_schema->field(i);
-            if (!field) {
-                LOG(WARNING) << "Get field schema failed. Column order:" << i;
-                return Status::InternalError(_status.ToString());
-            }
-            _parquet_column_type.emplace_back(field->type()->id());
+        if (config::parquet_predicate_push_down) {
+            _row_group_reader.reset(
+                    new RowGroupReader(_profile, conjunct_ctxs, _file_metadata, this));
+            _row_group_reader->init_filter_groups(tuple_desc, _map_column, _include_column_ids);
         }
+        _thread = std::thread(&ParquetReaderWrap::prefetch_batch, this);
         return Status::OK();
     } catch (parquet::ParquetException& e) {
         std::stringstream str_error;
@@ -188,19 +180,25 @@ Status ParquetReaderWrap::read_record_batch(bool* eof) {
 }
 
 Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) {
-    if (_batch->num_rows() == 0 || _current_line_of_batch != 0 || _current_line_of_group != 0) {
-        RETURN_IF_ERROR(read_record_batch(eof));
+    std::unique_lock<std::mutex> lock(_mtx);
+    while (!_closed && _queue.empty()) {
+        if (_batch_eof) {
+            _include_column_ids.clear();
+            *eof = true;
+            _batch_eof = false;
+            return Status::OK();
+        }
+        _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
     }
-    *batch = get_batch();
+    if (UNLIKELY(_closed)) {
+        return Status::InternalError(_status.message());
+    }
+    *batch = _queue.front();
+    _queue.pop_front();
+    _queue_writer_cond.notify_one();
     return Status::OK();
 }
 
-const std::shared_ptr<arrow::RecordBatch>& ParquetReaderWrap::get_batch() {
-    _current_line_of_batch += _batch->num_rows();
-    _current_line_of_group += _batch->num_rows();
-    return _batch;
-}
-
 Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
                                            uint8_t* buf, int32_t* wbytes) {
     const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
@@ -240,8 +238,32 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::Timestam
     return Status::OK();
 }
 
+Status ParquetReaderWrap::init_parquet_type() {
+    // read batch
+    RETURN_IF_ERROR(read_next_batch());
+    _current_line_of_batch = 0;
+    if (_batch == nullptr) {
+        return Status::OK();
+    }
+    //save column type
+    std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
+    for (int i = 0; i < _include_column_ids.size(); i++) {
+        std::shared_ptr<arrow::Field> field = field_schema->field(i);
+        if (!field) {
+            LOG(WARNING) << "Get field schema failed. Column order:" << i;
+            return Status::InternalError(_status.ToString());
+        }
+        _parquet_column_type.emplace_back(field->type()->id());
+    }
+    return Status::OK();
+}
+
 Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
                                MemPool* mem_pool, bool* eof) {
+    if (_batch == nullptr) {
+        _current_line_of_group += _rows_of_group;
+        return read_record_batch(eof);
+    }
     uint8_t tmp_buf[128] = {0};
     int32_t wbytes = 0;
     const uint8_t* value = nullptr;
@@ -535,8 +557,18 @@ void ParquetReaderWrap::prefetch_batch() {
     int total_groups = _total_groups;
     while (true) {
         if (_closed || current_group >= total_groups) {
+            _batch_eof = true;
+            _queue_reader_cond.notify_one();
             return;
         }
+        if (config::parquet_predicate_push_down) {
+            auto filter_group_set = _row_group_reader->filter_groups();
+            if (filter_group_set.end() != filter_group_set.find(current_group)) {
+                // find filter group, skip
+                current_group++;
+                continue;
+            }
+        }
         _status = _reader->GetRecordBatchReader({current_group}, _include_column_ids, &_rb_reader);
         if (!_status.ok()) {
             _closed = true;
@@ -556,6 +588,9 @@ void ParquetReaderWrap::prefetch_batch() {
 Status ParquetReaderWrap::read_next_batch() {
     std::unique_lock<std::mutex> lock(_mtx);
     while (!_closed && _queue.empty()) {
+        if (_batch_eof) {
+            return Status::OK();
+        }
         _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
     }
 
diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h
index 870c560fe4..8d9a420f55 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -41,6 +41,7 @@
 #include "common/config.h"
 #include "common/status.h"
 #include "exec/arrow/arrow_reader.h"
+#include "exec/arrow/parquet_row_group_reader.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
@@ -55,12 +56,13 @@ class Tuple;
 class SlotDescriptor;
 class MemPool;
 class FileReader;
+class RowGroupReader;
 
 // Reader of parquet file
 class ParquetReaderWrap final : public ArrowReaderWrap {
 public:
     // batch_size is not use here
-    ParquetReaderWrap(FileReader* file_reader, int64_t batch_size,
+    ParquetReaderWrap(RuntimeProfile* profile, FileReader* file_reader, int64_t batch_size,
                       int32_t num_of_columns_from_file);
     ~ParquetReaderWrap() override;
 
@@ -68,8 +70,11 @@ public:
     Status read(Tuple* tuple, const std::vector<SlotDescriptor*>& tuple_slot_descs,
                 MemPool* mem_pool, bool* eof) override;
     Status size(int64_t* size) override;
-    Status init_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
+    Status init_reader(const TupleDescriptor* tuple_desc,
+                       const std::vector<SlotDescriptor*>& tuple_slot_descs,
+                       const std::vector<ExprContext*>& conjunct_ctxs,
                        const std::string& timezone) override;
+    Status init_parquet_type();
     Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch, bool* eof) override;
 
 private:
@@ -77,7 +82,6 @@ private:
                    int32_t len);
     Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
     Status read_record_batch(bool* eof);
-    const std::shared_ptr<arrow::RecordBatch>& get_batch();
     Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
                             int32_t* wbtyes);
 
@@ -95,16 +99,18 @@ private:
     int _rows_of_group; // rows in a group.
     int _current_line_of_group;
     int _current_line_of_batch;
-
+    RuntimeProfile* _profile;
     std::string _timezone;
 
 private:
     std::atomic<bool> _closed = false;
+    std::atomic<bool> _batch_eof = false;
     arrow::Status _status;
     std::mutex _mtx;
     std::condition_variable _queue_reader_cond;
     std::condition_variable _queue_writer_cond;
     std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+    std::unique_ptr<doris::RowGroupReader> _row_group_reader;
     const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
     std::thread _thread;
 };
diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp
new file mode 100644
index 0000000000..48e382e88c
--- /dev/null
+++ b/be/src/exec/arrow/parquet_row_group_reader.cpp
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/arrow/parquet_row_group_reader.h"
+
+#include <exprs/expr_context.h>
+#include <exprs/in_predicate.h>
+#include <parquet/encoding.h>
+
+#include <cstring>
+
+#define _PLAIN_DECODE(T, value, min_bytes, max_bytes, out_value, out_min, out_max) \
+    const T out_min = reinterpret_cast<const T*>(min_bytes)[0];                    \
+    const T out_max = reinterpret_cast<const T*>(max_bytes)[0];                    \
+    T out_value = *((T*)value);
+
+#define _PLAIN_DECODE_SINGLE(T, value, bytes, conjunct_value, out) \
+    const T out = reinterpret_cast<const T*>(bytes)[0];            \
+    T conjunct_value = *((T*)value);
+
+#define _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) \
+    if (conjunct_value < min || conjunct_value > max) {    \
+        return true;                                       \
+    }
+
+#define _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) \
+    if (max <= conjunct_value) {                      \
+        return true;                                  \
+    }
+
+#define _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) \
+    if (max < conjunct_value) {                       \
+        return true;                                  \
+    }
+
+#define _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) \
+    if (min >= conjunct_value) {                      \
+        return true;                                  \
+    }
+
+#define _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) \
+    if (min > conjunct_value) {                       \
+        return true;                                  \
+    }
+
+#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \
+    std::vector<T> in_values;                                        \
+    for (auto val : in_pred_values) {                                \
+        T value = reinterpret_cast<T*>(val)[0];                      \
+        in_values.emplace_back(value);                               \
+    }                                                                \
+    if (in_values.empty()) {                                         \
+        return false;                                                \
+    }                                                                \
+    std::sort(in_values.begin(), in_values.end());                   \
+    T in_min = in_values.front();                                    \
+    T in_max = in_values.back();                                     \
+    const T group_min = reinterpret_cast<const T*>(min_bytes)[0];    \
+    const T group_max = reinterpret_cast<const T*>(max_bytes)[0];    \
+    if (in_max < group_min || in_min > group_max) {                  \
+        return true;                                                 \
+    }
+
+namespace doris {
+
+RowGroupReader::RowGroupReader(RuntimeProfile* profile,
+                               const std::vector<ExprContext*>& conjunct_ctxs,
+                               std::shared_ptr<parquet::FileMetaData>& file_metadata,
+                               ParquetReaderWrap* parent)
+        : _conjunct_ctxs(conjunct_ctxs),
+          _file_metadata(file_metadata),
+          _profile(profile),
+          _parent(parent) {}
+
+RowGroupReader::~RowGroupReader() {
+    _slot_conjuncts.clear();
+    _filter_group.clear();
+}
+
+Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc,
+                                          const std::map<std::string, int>& map_column,
+                                          const std::vector<int>& include_column_ids) {
+    std::unordered_set<int> parquet_column_ids(include_column_ids.begin(),
+                                               include_column_ids.end());
+    _init_conjuncts(tuple_desc, map_column, parquet_column_ids);
+    int total_group = _file_metadata->num_row_groups();
+    _parent->statistics()->total_groups = total_group;
+    _parent->statistics()->total_rows = _file_metadata->num_rows();
+
+    int32_t filtered_num_row_groups = 0;
+    int64_t filtered_num_rows = 0;
+    int64_t filtered_total_byte_size = 0;
+    bool update_statistics = false;
+    for (int row_group_id = 0; row_group_id < total_group; row_group_id++) {
+        auto row_group_meta = _file_metadata->RowGroup(row_group_id);
+        for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) {
+            const std::string& col_name = tuple_desc->slots()[slot_id]->col_name();
+            auto col_iter = map_column.find(col_name);
+            if (col_iter == map_column.end()) {
+                continue;
+            }
+            int parquet_col_id = col_iter->second;
+            if (parquet_column_ids.end() == parquet_column_ids.find(parquet_col_id)) {
+                // Column not exist in parquet file
+                continue;
+            }
+            auto slot_iter = _slot_conjuncts.find(slot_id);
+            if (slot_iter == _slot_conjuncts.end()) {
+                continue;
+            }
+            auto statistic = row_group_meta->ColumnChunk(parquet_col_id)->statistics();
+            if (!statistic->HasMinMax()) {
+                continue;
+            }
+            // Min-max of statistic is plain-encoded value
+            const std::string& min = statistic->EncodeMin();
+            const std::string& max = statistic->EncodeMax();
+
+            bool group_need_filter = _determine_filter_row_group(slot_iter->second, min, max);
+            if (group_need_filter) {
+                update_statistics = true;
+                filtered_num_row_groups++;
+                filtered_num_rows += row_group_meta->num_rows();
+                filtered_total_byte_size += row_group_meta->total_byte_size();
+                VLOG_DEBUG << "Filter row group id: " << row_group_id;
+                _filter_group.emplace(row_group_id);
+                break;
+            }
+        }
+    }
+    if (update_statistics) {
+        _parent->statistics()->filtered_row_groups = filtered_num_row_groups;
+        _parent->statistics()->filtered_rows = filtered_num_rows;
+        _parent->statistics()->filtered_total_bytes = filtered_total_byte_size;
+        VLOG_DEBUG << "Parquet file: " << _file_metadata->schema()->name()
+                   << ", Num of read row group: " << total_group
+                   << ", and num of skip row group: " << filtered_num_row_groups;
+    }
+    return Status::OK();
+}
+
+void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc,
+                                     const std::map<std::string, int>& map_column,
+                                     const std::unordered_set<int>& include_column_ids) {
+    if (tuple_desc->slots().empty()) {
+        return;
+    }
+    for (int i = 0; i < tuple_desc->slots().size(); i++) {
+        auto col_iter = map_column.find(tuple_desc->slots()[i]->col_name());
+        if (col_iter == map_column.end()) {
+            continue;
+        }
+        int parquet_col_id = col_iter->second;
+        if (include_column_ids.end() == include_column_ids.find(parquet_col_id)) {
+            continue;
+        }
+        for (int conj_idx = 0; conj_idx < _conjunct_ctxs.size(); conj_idx++) {
+            Expr* conjunct = _conjunct_ctxs[conj_idx]->root();
+            if (conjunct->get_num_children() == 0) {
+                continue;
+            }
+            Expr* raw_slot = conjunct->get_child(0);
+            if (TExprNodeType::SLOT_REF != raw_slot->node_type()) {
+                continue;
+            }
+            SlotRef* slot_ref = (SlotRef*)raw_slot;
+            SlotId conjunct_slot_id = slot_ref->slot_id();
+            if (conjunct_slot_id == tuple_desc->slots()[i]->id()) {
+                // Get conjuncts by conjunct_slot_id
+                auto iter = _slot_conjuncts.find(conjunct_slot_id);
+                if (_slot_conjuncts.end() == iter) {
+                    std::vector<ExprContext*> conjuncts;
+                    conjuncts.emplace_back(_conjunct_ctxs[conj_idx]);
+                    _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts));
+                } else {
+                    std::vector<ExprContext*> conjuncts = iter->second;
+                    conjuncts.emplace_back(_conjunct_ctxs[conj_idx]);
+                }
+            }
+        }
+    }
+}
+
+bool RowGroupReader::_determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
+                                                 const std::string& encoded_min,
+                                                 const std::string& encoded_max) {
+    const char* min_bytes = encoded_min.data();
+    const char* max_bytes = encoded_max.data();
+    bool need_filter = false;
+    for (int i = 0; i < conjuncts.size(); i++) {
+        Expr* conjunct = conjuncts[i]->root();
+        if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
+            _eval_binary_predicate(conjuncts[i], min_bytes, max_bytes, need_filter);
+        } else if (TExprNodeType::IN_PRED == conjunct->node_type()) {
+            _eval_in_predicate(conjuncts[i], min_bytes, max_bytes, need_filter);
+        }
+    }
+    return need_filter;
+}
+
+void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes,
+                                            const char* max_bytes, bool& need_filter) {
+    Expr* conjunct = ctx->root();
+    Expr* expr = conjunct->get_child(1);
+    if (expr == nullptr) {
+        return;
+    }
+    // supported conjunct example: slot_ref < 123, slot_ref > func(123), ..
+    auto conjunct_type = expr->type().type;
+    void* conjunct_value = ctx->get_value(expr, nullptr);
+    switch (conjunct->op()) {
+    case TExprOpcode::EQ:
+        need_filter = _eval_eq(conjunct_type, conjunct_value, min_bytes, max_bytes);
+        break;
+    case TExprOpcode::NE:
+        break;
+    case TExprOpcode::GT:
+        need_filter = _eval_gt(conjunct_type, conjunct_value, max_bytes);
+        break;
+    case TExprOpcode::GE:
+        need_filter = _eval_ge(conjunct_type, conjunct_value, max_bytes);
+        break;
+    case TExprOpcode::LT:
+        need_filter = _eval_lt(conjunct_type, conjunct_value, min_bytes);
+        break;
+    case TExprOpcode::LE:
+        need_filter = _eval_le(conjunct_type, conjunct_value, min_bytes);
+        break;
+    default:
+        break;
+    }
+}
+
+void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes,
+                                        const char* max_bytes, bool& need_filter) {
+    Expr* conjunct = ctx->root();
+    std::vector<void*> in_pred_values;
+    const InPredicate* pred = static_cast<const InPredicate*>(conjunct);
+    HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
+    // TODO: process expr: in(func(123),123)
+    while (iter->has_next()) {
+        if (nullptr == iter->get_value()) {
+            return;
+        }
+        in_pred_values.emplace_back(const_cast<void*>(iter->get_value()));
+        iter->next();
+    }
+    auto conjunct_type = conjunct->get_child(1)->type().type;
+    switch (conjunct->op()) {
+    case TExprOpcode::FILTER_IN:
+        need_filter = _eval_in_val(conjunct_type, in_pred_values, min_bytes, max_bytes);
+        break;
+    //  case TExprOpcode::FILTER_NOT_IN:
+    default:
+        need_filter = false;
+    }
+}
+
+bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
+                                  const char* min_bytes, const char* max_bytes) {
+    switch (conjunct_type) {
+    case TYPE_TINYINT: {
+        _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes)
+        break;
+    }
+    case TYPE_SMALLINT: {
+        _FILTER_GROUP_BY_IN(int16_t, in_pred_values, min_bytes, max_bytes)
+        break;
+    }
+    case TYPE_INT: {
+        _FILTER_GROUP_BY_IN(int32_t, in_pred_values, min_bytes, max_bytes)
+        break;
+    }
+    case TYPE_BIGINT: {
+        _FILTER_GROUP_BY_IN(int64_t, in_pred_values, min_bytes, max_bytes)
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        std::vector<const char*> in_values;
+        for (auto val : in_pred_values) {
+            const char* value = ((std::string*)val)->c_str();
+            in_values.emplace_back(value);
+        }
+        if (in_values.empty()) {
+            return false;
+        }
+        std::sort(in_values.begin(), in_values.end());
+        const char* in_min = in_values.front();
+        const char* in_max = in_values.back();
+        if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) {
+            return true;
+        }
+        break;
+    }
+    default:
+        return false;
+    }
+    return false;
+}
+
+bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
+                              const char* max_bytes) {
+    switch (conjunct_type) {
+    case TYPE_TINYINT: {
+        _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+        break;
+    }
+    case TYPE_SMALLINT: {
+        _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+        break;
+    }
+    case TYPE_INT: {
+        _PLAIN_DECODE(int32_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+        break;
+    }
+    case TYPE_BIGINT: {
+        _PLAIN_DECODE(int64_t, value, min_bytes, max_bytes, conjunct_value, min, max)
+        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+        break;
+    }
+    case TYPE_DOUBLE: {
+        _PLAIN_DECODE(double, value, min_bytes, max_bytes, conjunct_value, min, max)
+        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+        break;
+    }
+    case TYPE_FLOAT: {
+        _PLAIN_DECODE(float, value, min_bytes, max_bytes, conjunct_value, min, max)
+        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        const char* conjunct_value = ((std::string*)value)->c_str();
+        if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) {
+            return true;
+        }
+        break;
+    }
+    default:
+        return false;
+    }
+    return false;
+}
+
+bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
+    switch (conjunct_type) {
+    case TYPE_TINYINT: {
+        _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_SMALLINT: {
+        _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_INT: {
+        _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_BIGINT: {
+        _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_DOUBLE: {
+        _PLAIN_DECODE_SINGLE(double, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_FLOAT: {
+        _PLAIN_DECODE_SINGLE(float, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GT_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        //            case TYPE_TIME:
+        const char* conjunct_value = ((std::string*)value)->c_str();
+        if (strcmp(max_bytes, conjunct_value) <= 0) {
+            return true;
+        }
+        break;
+    }
+    default:
+        return false;
+    }
+    return false;
+}
+
+bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) {
+    switch (conjunct_type) {
+    case TYPE_TINYINT: {
+        _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_SMALLINT: {
+        _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_INT: {
+        _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_BIGINT: {
+        _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_DOUBLE: {
+        _PLAIN_DECODE_SINGLE(double, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_FLOAT: {
+        _PLAIN_DECODE_SINGLE(float, value, max_bytes, conjunct_value, max)
+        _FILTER_GROUP_BY_GE_PRED(conjunct_value, max)
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        //            case TYPE_TIME:
+        const char* conjunct_value = ((std::string*)value)->c_str();
+        if (strcmp(max_bytes, conjunct_value) < 0) {
+            return true;
+        }
+        break;
+    }
+    default:
+        return false;
+    }
+    return false;
+}
+
+bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
+    switch (conjunct_type) {
+    case TYPE_TINYINT: {
+        _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_SMALLINT: {
+        _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_INT: {
+        _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_BIGINT: {
+        _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_DOUBLE: {
+        _PLAIN_DECODE_SINGLE(double, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_FLOAT: {
+        _PLAIN_DECODE_SINGLE(float, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LT_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        //            case TYPE_TIME:
+        const char* conjunct_value = ((std::string*)value)->c_str();
+        if (strcmp(min_bytes, conjunct_value) >= 0) {
+            return true;
+        }
+        break;
+    }
+    default:
+        return false;
+    }
+    return false;
+}
+
+bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) {
+    switch (conjunct_type) {
+    case TYPE_TINYINT: {
+        _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_SMALLINT: {
+        _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_INT: {
+        _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_BIGINT: {
+        _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_DOUBLE: {
+        _PLAIN_DECODE_SINGLE(double, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_FLOAT: {
+        _PLAIN_DECODE_SINGLE(float, value, min_bytes, conjunct_value, min)
+        _FILTER_GROUP_BY_LE_PRED(conjunct_value, min)
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_DATE:
+    case TYPE_DATETIME: {
+        //            case TYPE_TIME:
+        const char* conjunct_value = ((std::string*)value)->c_str();
+        if (strcmp(min_bytes, conjunct_value) > 0) {
+            return true;
+        }
+        break;
+    }
+    default:
+        return false;
+    }
+    return false;
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/arrow/parquet_row_group_reader.h b/be/src/exec/arrow/parquet_row_group_reader.h
new file mode 100644
index 0000000000..d29a6c2e05
--- /dev/null
+++ b/be/src/exec/arrow/parquet_row_group_reader.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/type_fwd.h>
+#include <exprs/expr.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/encoding.h>
+#include <parquet/file_reader.h>
+#include <parquet/metadata.h>
+#include <parquet/statistics.h>
+#include <parquet/types.h>
+
+#include <unordered_set>
+
+#include "common/status.h"
+#include "exec/arrow/parquet_reader.h"
+
+namespace doris {
+
+class ParquetReaderWrap;
+
+class RowGroupReader {
+public:
+    RowGroupReader(RuntimeProfile* profile, const std::vector<ExprContext*>& conjunct_ctxs,
+                   std::shared_ptr<parquet::FileMetaData>& file_metadata,
+                   ParquetReaderWrap* parent);
+    ~RowGroupReader();
+
+    Status init_filter_groups(const TupleDescriptor* tuple_desc,
+                              const std::map<std::string, int>& map_column,
+                              const std::vector<int>& include_column_ids);
+
+    std::unordered_set<int> filter_groups() { return _filter_group; };
+
+private:
+    void _init_conjuncts(const TupleDescriptor* tuple_desc,
+                         const std::map<std::string, int>& _map_column,
+                         const std::unordered_set<int>& include_column_ids);
+
+    bool _determine_filter_row_group(const std::vector<ExprContext*>& conjuncts,
+                                     const std::string& encoded_min,
+                                     const std::string& encoded_max);
+
+    void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
+                                bool& need_filter);
+
+    void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes,
+                            bool& need_filter);
+
+    bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values,
+                      const char* min_bytes, const char* max_bytes);
+
+    bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes,
+                  const char* max_bytes);
+
+    bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes);
+
+    bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes);
+
+    bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes);
+
+    bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes);
+
+private:
+    std::map<int, std::vector<ExprContext*>> _slot_conjuncts;
+    std::unordered_set<int> _filter_group;
+
+    std::vector<ExprContext*> _conjunct_ctxs;
+    std::shared_ptr<parquet::FileMetaData> _file_metadata;
+    RuntimeProfile* _profile;
+    ParquetReaderWrap* _parent;
+};
+} // namespace doris
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 3ec5aba1cd..ef85e56dd3 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -95,6 +95,12 @@ Status BaseScanner::open() {
     return Status::OK();
 }
 
+void BaseScanner::reg_conjunct_ctxs(const TupleId& tupleId,
+                                    const std::vector<ExprContext*>& conjunct_ctxs) {
+    _conjunct_ctxs = conjunct_ctxs;
+    _tupleId = tupleId;
+}
+
 Status BaseScanner::init_expr_ctxes() {
     // Construct _src_slot_descs
     const TupleDescriptor* src_tuple_desc =
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index fe3e088d4e..7efdee0f2f 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -64,6 +64,10 @@ public:
         }
     }
 
+    // Register conjuncts for push down
+    virtual void reg_conjunct_ctxs(const TupleId& tupleId,
+                                   const std::vector<ExprContext*>& conjunct_ctxs);
+
     virtual Status init_expr_ctxes();
     // Open this scanner, will initialize information need to
     virtual Status open();
@@ -142,6 +146,10 @@ protected:
     vectorized::Block _src_block;
     int _num_of_columns_from_file;
 
+    // slot_ids for parquet predicate push down are in tuple desc
+    TupleId _tupleId;
+    std::vector<ExprContext*> _conjunct_ctxs;
+
 private:
     Status _filter_src_block();
     void _fill_columns_from_path();
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 532a9d7d06..49fed4c3ce 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -263,6 +263,7 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
                                      _pre_filter_texprs, counter);
         }
     }
+    scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
     std::unique_ptr<BaseScanner> scanner(scan);
     return scanner;
 }
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 776115427e..2e9ea1f3eb 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -106,11 +106,11 @@ Status ParquetScanner::open_next_reader() {
         if (range.__isset.num_of_columns_from_file) {
             num_of_columns_from_file = range.num_of_columns_from_file;
         }
-        _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(),
-                                                 num_of_columns_from_file);
-
-        Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
-
+        _cur_file_reader = new ParquetReaderWrap(_profile, file_reader.release(),
+                                                 _state->batch_size(), num_of_columns_from_file);
+        auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
+        Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
+                                                      _state->timezone());
         if (status.is_end_of_file()) {
             continue;
         } else {
@@ -118,6 +118,7 @@ Status ParquetScanner::open_next_reader() {
                 return Status::InternalError("file: {}, error:{}", range.path,
                                              status.get_error_msg());
             } else {
+                RETURN_IF_ERROR(_cur_file_reader->init_parquet_type());
                 return status;
             }
         }
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 7fcb277bb0..6543ff1b9c 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -164,6 +164,7 @@ private:
     friend class BloomFilterPredicate;
     friend class OlapScanNode;
     friend class EsPredicate;
+    friend class RowGroupReader;
     friend class vectorized::VOlapScanNode;
 
     /// FunctionContexts for each registered expression. The FunctionContexts are created
diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp
index 57defdb8ef..9adad2d71a 100644
--- a/be/src/vec/exec/file_arrow_scanner.cpp
+++ b/be/src/vec/exec/file_arrow_scanner.cpp
@@ -70,8 +70,9 @@ Status FileArrowScanner::_open_next_reader() {
         _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
                                              num_of_columns_from_file);
 
-        Status status = _cur_file_reader->init_reader(_file_slot_descs, _state->timezone());
-
+        auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
+        Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs,
+                                                      _state->timezone());
         if (status.is_end_of_file()) {
             continue;
         } else {
@@ -207,7 +208,7 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr
 
 ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
                                                         int32_t num_of_columns_from_file) {
-    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+    return new ParquetReaderWrap(_profile, file_reader, batch_size, num_of_columns_from_file);
 }
 
 VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile,
diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp
index 4105f8dc01..741b66dd81 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -315,6 +315,7 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange&
         scan = new FileTextScanner(_runtime_state, runtime_profile(), scan_range.params,
                                    scan_range.ranges, _pre_filter_texprs, counter);
     }
+    scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
     std::unique_ptr<FileScanner> scanner(scan);
     return scanner;
 }
diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp
index d9780f8dbe..3168e220d5 100644
--- a/be/src/vec/exec/file_scanner.cpp
+++ b/be/src/vec/exec/file_scanner.cpp
@@ -69,6 +69,12 @@ Status FileScanner::open() {
     return Status::OK();
 }
 
+void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId,
+                                    const std::vector<ExprContext*>& conjunct_ctxs) {
+    _conjunct_ctxs = conjunct_ctxs;
+    _tupleId = tupleId;
+}
+
 Status FileScanner::_init_expr_ctxes() {
     const TupleDescriptor* src_tuple_desc =
             _state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id);
diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h
index cb21b876d5..abdc2eaa4e 100644
--- a/be/src/vec/exec/file_scanner.h
+++ b/be/src/vec/exec/file_scanner.h
@@ -34,6 +34,9 @@ public:
 
     virtual ~FileScanner() = default;
 
+    virtual void reg_conjunct_ctxs(const TupleId& tupleId,
+                                   const std::vector<ExprContext*>& conjunct_ctxs);
+
     // Open this scanner, will initialize information need to
     virtual Status open();
 
@@ -85,6 +88,11 @@ protected:
     std::unique_ptr<vectorized::VExprContext*> _vpre_filter_ctx_ptr;
     int _num_of_columns_from_file;
 
+    // File formats based push down predicate
+    std::vector<ExprContext*> _conjunct_ctxs;
+    // slot_ids for parquet predicate push down are in tuple desc
+    TupleId _tupleId;
+
 private:
     Status _init_expr_ctxes();
     Status _filter_block(vectorized::Block* output_block);
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 05329f1d04..9d2bfb8a52 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -37,7 +37,13 @@ VArrowScanner::VArrowScanner(RuntimeState* state, RuntimeProfile* profile,
           _cur_file_reader(nullptr),
           _cur_file_eof(false),
           _batch(nullptr),
-          _arrow_batch_cur_idx(0) {}
+          _arrow_batch_cur_idx(0) {
+    _filtered_row_groups_counter = ADD_COUNTER(_profile, "FileFilteredRowGroups", TUnit::UNIT);
+    _filtered_rows_counter = ADD_COUNTER(_profile, "FileFilteredRows", TUnit::UNIT);
+    _filtered_bytes_counter = ADD_COUNTER(_profile, "FileFilteredBytes", TUnit::BYTES);
+    _total_rows_counter = ADD_COUNTER(_profile, "FileTotalRows", TUnit::UNIT);
+    _total_groups_counter = ADD_COUNTER(_profile, "FileTotalRowGroups", TUnit::UNIT);
+}
 
 VArrowScanner::~VArrowScanner() {
     close();
@@ -72,8 +78,9 @@ Status VArrowScanner::_open_next_reader() {
         }
         _cur_file_reader = _new_arrow_reader(file_reader.release(), _state->batch_size(),
                                              num_of_columns_from_file);
-
-        Status status = _cur_file_reader->init_reader(_src_slot_descs, _state->timezone());
+        auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
+        Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs,
+                                                      _state->timezone());
 
         if (status.is_end_of_file()) {
             continue;
@@ -82,12 +89,21 @@ Status VArrowScanner::_open_next_reader() {
                 return Status::InternalError(" file: {} error:{}", range.path,
                                              status.get_error_msg());
             } else {
+                update_profile(_cur_file_reader->statistics());
                 return status;
             }
         }
     }
 }
 
+void VArrowScanner::update_profile(std::shared_ptr<Statistics>& statistics) {
+    COUNTER_UPDATE(_total_groups_counter, statistics->total_groups);
+    COUNTER_UPDATE(_filtered_row_groups_counter, statistics->filtered_row_groups);
+    COUNTER_UPDATE(_total_rows_counter, statistics->total_rows);
+    COUNTER_UPDATE(_filtered_rows_counter, statistics->filtered_rows);
+    COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes);
+}
+
 Status VArrowScanner::open() {
     RETURN_IF_ERROR(BaseScanner::open());
     if (_ranges.empty()) {
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 176ddb58f2..109ce53177 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -29,6 +29,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "exec/arrow/parquet_reader.h"
 #include "exec/base_scanner.h"
 #include "gen_cpp/Types_types.h"
 #include "runtime/mem_pool.h"
@@ -56,6 +57,9 @@ public:
 
     virtual Status get_next(Block* block, bool* eof) override;
 
+    // Update file predicate filter profile
+    void update_profile(std::shared_ptr<Statistics>& statistics);
+
     virtual void close() override;
 
 protected:
@@ -77,6 +81,12 @@ private:
     bool _cur_file_eof; // is read over?
     std::shared_ptr<arrow::RecordBatch> _batch;
     size_t _arrow_batch_cur_idx;
+
+    RuntimeProfile::Counter* _filtered_row_groups_counter;
+    RuntimeProfile::Counter* _filtered_rows_counter;
+    RuntimeProfile::Counter* _filtered_bytes_counter;
+    RuntimeProfile::Counter* _total_rows_counter;
+    RuntimeProfile::Counter* _total_groups_counter;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index 6f63324e6d..dc738c5a09 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -321,6 +321,7 @@ std::unique_ptr<BaseScanner> VBrokerScanNode::create_scanner(const TBrokerScanRa
                                               scan_range.ranges, scan_range.broker_addresses,
                                               _pre_filter_texprs, counter);
     }
+    scan->reg_conjunct_ctxs(_tuple_id, _conjunct_ctxs);
     std::unique_ptr<BaseScanner> scanner(scan);
     return scanner;
 }
diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp
index cb59ae60bc..319d8a3ccb 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -32,7 +32,7 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile,
 
 ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size,
                                                     int32_t num_of_columns_from_file) {
-    return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file);
+    return new ParquetReaderWrap(_profile, file_reader, batch_size, num_of_columns_from_file);
 }
 
 } // namespace doris::vectorized
diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp
index cbd673d4dc..5b1020224e 100644
--- a/be/test/exec/parquet_scanner_test.cpp
+++ b/be/test/exec/parquet_scanner_test.cpp
@@ -58,6 +58,7 @@ private:
     int create_dst_tuple(TDescriptorTable& t_desc_table, int next_slot_id);
     void create_expr_info();
     void init_desc_table();
+    void init_filter_expr();
     RuntimeState _runtime_state;
     ObjectPool _obj_pool;
     std::map<std::string, SlotDescriptor*> _slots_map;
@@ -406,10 +407,70 @@ void ParquetScannerTest::create_expr_info() {
     _params.__set_src_tuple_id(TUPLE_ID_SRC);
 }
 
+void ParquetScannerTest::init_filter_expr() {
+    TTypeDesc bool_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::BOOLEAN);
+        scalar_type.__set_len(5000);
+        node.__set_scalar_type(scalar_type);
+        bool_type.types.push_back(node);
+    }
+    TTypeDesc int_type;
+    {
+        TTypeNode node;
+        node.__set_type(TTypeNodeType::SCALAR);
+        TScalarType scalar_type;
+        scalar_type.__set_type(TPrimitiveType::BIGINT);
+        node.__set_scalar_type(scalar_type);
+        int_type.types.push_back(node);
+    }
+
+    // create predicate
+    ::doris::TExpr expr;
+
+    // create predicate elements: LeftExpr op RightExpr
+    // expr: log_time > 1
+    ::doris::TExprNode op;
+    op.node_type = TExprNodeType::BINARY_PRED;
+    op.opcode = TExprOpcode::GT;
+    op.type = bool_type;
+    op.num_children = 2;
+    op.child_type = TPrimitiveType::BIGINT;
+    op.__isset.opcode = true;
+    expr.nodes.push_back(op);
+
+    // log_time
+    ::doris::TExprNode slot_ref;
+    slot_ref.node_type = TExprNodeType::SLOT_REF;
+    slot_ref.type = int_type;
+    slot_ref.slot_ref.slot_id = 1;
+    slot_ref.slot_ref.tuple_id = 0;
+    slot_ref.num_children = 0;
+    slot_ref.__isset.slot_ref = true;
+    expr.nodes.push_back(slot_ref);
+
+    ::doris::TExprNode int_expr;
+    int_expr.node_type = TExprNodeType::INT_LITERAL;
+    int_expr.type = int_type;
+    int_expr.int_literal.value = 1;
+    int_expr.num_children = 0;
+    int_expr.__isset.int_literal = true;
+
+    expr.nodes.push_back(int_expr);
+
+    std::vector<::doris::TExpr> conjuncts;
+    conjuncts.push_back(expr);
+    // push down conjuncts;
+    _tnode.__set_conjuncts(conjuncts);
+}
+
 void ParquetScannerTest::init() {
     create_expr_info();
     init_desc_table();
-
+    init_filter_expr();
     // Node Id
     _tnode.node_id = 0;
     _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
@@ -419,6 +480,7 @@ void ParquetScannerTest::init() {
     _tnode.nullable_tuples.push_back(false);
     _tnode.broker_scan_node.tuple_id = 0;
     _tnode.__isset.broker_scan_node = true;
+    _tnode.__isset.conjuncts = true;
 }
 
 TEST_F(ParquetScannerTest, normal) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org