You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/07 21:37:46 UTC

[GitHub] [arrow] fatemehp opened a new pull request, #14603: [PARQUET-2210]:[cpp] Skip pages based on header metadata using a callback

fatemehp opened a new pull request, #14603:
URL: https://github.com/apache/arrow/pull/14603

   Currently, we do not expose the page header metadata and they cannot be used for skipping pages. I propose exposing the metadata through a callback that would allow the caller to decide if they want to read or skip the page based on the metadata. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1037929261


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +441,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    EncodedStatistics page_statistics;

Review Comment:
   Renaming it to data_page_statistics? For now it is out of the scope of data page, so we'd state clearly that it is meaningful for data page only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049253374


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -337,6 +348,50 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
   }
 }
 
+bool SerializedPageReader::ShouldSkipPage(EncodedStatistics& page_statistics) {
+  const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+  if (page_type == PageType::DATA_PAGE) {
+    const format::DataPageHeader& header = current_page_header_.data_page_header;
+    CheckNumValuesInHeader(header.num_values);
+    page_statistics = ExtractStatsFromHeader(header);

Review Comment:
   I made this clear in the comments for setting the callback function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049255078


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        is_stats_set(encoded_statistics->is_set()),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  EncodedStatistics* encoded_statistics;

Review Comment:
   I made it a const.
   
   @emkornfield, I made it clear in the comments for setting the callback function that the caller is responsible to check that statistics are correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341422934

   But since you know the column type, you should be able to cast the `Statistics` to the corresponding `TypedStatistics<>`, which gives you the decoded values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1358043912

   @emkornfield could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035217903


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats stores statistics about a data page including number of
+// values and rows.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  explicit DataPageStats(const EncodedStatistics& encoded_statistics, int32_t num_values,

Review Comment:
   document encoded_statistics must live longer then this object (or maybe take it as a pointer to make this slightly more explicit).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1307835740

   :warning: Ticket **has no components in JIRA**, make sure you assign one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030834755


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it

Review Comment:
   ```suggestion
     // If skip_page_callback is present (not null), NextPage() must use this callback to determine if it
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049256845


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1048484566


##########
cpp/src/parquet/statistics.h:
##########
@@ -216,6 +216,13 @@ class PARQUET_EXPORT Statistics {
       bool has_distinct_count,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
+  // Helper function to convert EncodedStatistics to Statistics.
+  // Note that num_values will be set to -1 because number of non-null values in
+  // the column is not available in EncodedStatistics.
+  static std::shared_ptr<Statistics> Make(
+      const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

Review Comment:
   Let's allow passing `num_values` optionally?
   ```suggestion
     static std::shared_ptr<Statistics> Make(
         const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
         int64_t num_values = -1, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
   ```



##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);

Review Comment:
   Might be nice to vary the statistics a bit from page to page?



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or

Review Comment:
   Ok, but this should then skip the page if the returned value is false, not true.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        is_stats_set(encoded_statistics->is_set()),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  EncodedStatistics* encoded_statistics;

Review Comment:
   Also, this should perhaps be `const`?



##########
cpp/src/parquet/metadata.h:
##########
@@ -20,13 +20,15 @@
 #include <cstdint>
 #include <map>
 #include <memory>
+#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"

Review Comment:
   You said "done" but didn't address this?



##########
cpp/src/parquet/metadata.cc:
##########
@@ -19,6 +19,8 @@
 
 #include <algorithm>
 #include <cinttypes>
+#include <csignal>
+#include <iostream>

Review Comment:
   Why are `csignal` and `iostream` needed here?



##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;

Review Comment:
   Can you also test something about the page statistics in `stats`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1307835757

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030833614


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it
+  // should return or skip and move to the next page. If the callback function returns

Review Comment:
   we might want to clarify the contract here further:
   1.  Pages will be called in the order they appear in the column?
   2. The callback will be called at most once per page?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030832418


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.

Review Comment:
   if we are refactoring, would it make sense to move some of the logic to a helper method to shorten this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1042527726


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and

Review Comment:
   I think there is an API question of how we can consolidate page skipping for inlined statistics and those from the index.  To avoid churn we can always move this around later though i need be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035982919


##########
cpp/src/parquet/metadata.cc:
##########
@@ -19,10 +19,13 @@
 
 #include <algorithm>
 #include <cinttypes>
+#include <csignal>
+#include <iostream>
 #include <ostream>
 #include <string>
 #include <string_view>
 #include <utility>
+#include <variant>

Review Comment:
   Why are these new inclusions suddenly required?



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or

Review Comment:
   I think this would read better if it was advertised as a page filter (returning true to include a page and false to exclude it).
   Calling it a "skip page callback" may give the impression that it will only be called by the `Skip` APIs.



##########
cpp/src/parquet/metadata.h:
##########
@@ -20,13 +20,15 @@
 #include <cstdint>
 #include <map>
 #include <memory>
+#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"

Review Comment:
   We should try to be parcimonious when adding inclusions in header files, as they increase compile times. This one doesn't seem necessary as `class EncodedStatistics` is forward-declared below.



##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {

Review Comment:
   I'm not sure why this needs to manually define accessors and constructor instead of being a plain struct?
   ```c++
   struct DataPageStats {
     EncodedStatistics* encoded_statistics;
     int32_t num_values;
     std::optional<int32_t> num_rows;
   };
   ```



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    if (skip_page_callback_) {
+      throw ParquetException("set_skip_page_callback was called more than once");

Review Comment:
   Why would this be forbidden?



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {

Review Comment:
   Can we define a type name for this? For example:
   ```c++
     using DataPageFilter = std::function<bool(const DataPageStats&)>;
   ```



##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and

Review Comment:
   Why define this class here if it's only referenced in `column_reader.h`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1063682791


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(skip_even_pages);
+
+    for (int i = 0; i < this->kNumPages; ++i) {
+      // Only pages with odd number of values are read.
+      if (i % 2 != 0) {
+        std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+        ASSERT_NE(current_page, nullptr);
+        ASSERT_NO_FATAL_FAILURE(
+            CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      }
+    }
+    // We should have exhausted reading the pages by reading the odd pages only.
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+  this->WriteStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  // This callback will always return false.
+  auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_NO_FATAL_FAILURE(
+      CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+  // This callback will skip all pages.
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+  this->page_reader_->set_data_page_filter(skip_all_pages);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  // Try to read it back while asking for all data pages to be skipped.
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+  page_reader_->set_data_page_filter(skip_all_pages);

Review Comment:
   I guess this could be done by writing a golden pseudo-random file,  and then specifiying a filter that should filter some of the pages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1065131471


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(skip_even_pages);
+
+    for (int i = 0; i < this->kNumPages; ++i) {
+      // Only pages with odd number of values are read.
+      if (i % 2 != 0) {
+        std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+        ASSERT_NE(current_page, nullptr);
+        ASSERT_NO_FATAL_FAILURE(
+            CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      }
+    }
+    // We should have exhausted reading the pages by reading the odd pages only.
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+  this->WriteStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  // This callback will always return false.
+  auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_NO_FATAL_FAILURE(
+      CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+  // This callback will skip all pages.
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+  this->page_reader_->set_data_page_filter(skip_all_pages);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  // Try to read it back while asking for all data pages to be skipped.
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+  page_reader_->set_data_page_filter(skip_all_pages);

Review Comment:
   ACKed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1063681191


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,305 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+  void WritePageWithoutStats();
+  void CheckNumRows(std::optional<int32_t> num_rows, const T& header);
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A" + std::to_string(i));
+    this->data_page_header_.statistics.__set_max_value("Z" + std::to_string(i));
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.statistics.__set_distinct_count(num_rows);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A" + std::to_string(i));
+    this->data_page_header_v2_.statistics.__set_max_value("Z" + std::to_string(i));
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.statistics.__set_distinct_count(num_rows);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WritePageWithoutStats() {
+  int32_t num_rows = 100;
+  total_rows_ += num_rows;
+  int data_size = 1024;
+  this->data_page_header_.__set_num_values(num_rows);
+  ASSERT_NO_FATAL_FAILURE(
+      this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  data_page_headers_.push_back(this->data_page_header_);
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WritePageWithoutStats() {
+  int32_t num_rows = 100;
+  total_rows_ += num_rows;
+  int data_size = 1024;
+  this->data_page_header_v2_.__set_num_values(num_rows);
+  this->data_page_header_v2_.__set_num_rows(num_rows);
+  ASSERT_NO_FATAL_FAILURE(
+      this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+  data_page_headers_.push_back(this->data_page_header_v2_);
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::CheckNumRows(
+    std::optional<int32_t> num_rows, const format::DataPageHeader& header) {
+  ASSERT_EQ(num_rows, std::nullopt);
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::CheckNumRows(
+    std::optional<int32_t> num_rows, const format::DataPageHeaderV2& header) {
+  ASSERT_EQ(*num_rows, header.num_rows);
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Test that the returned encoded_statistics is nullptr when there are no
+// statistics in the page header.
+TYPED_TEST(PageFilterTest, TestPageWithoutStatistics) {
+  this->WritePageWithoutStats();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  int num_pages = 0;
+  bool is_stats_null = false;
+  auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+    is_stats_null = stats.encoded_statistics == nullptr;
+    ++num_pages;
+    return false;
+  };
+
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_EQ(num_pages, 1);
+  ASSERT_EQ(is_stats_null, true);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+     // Also check that the encoded statistics passed to the callback function
+     // are right.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    std::vector<EncodedStatistics> read_stats;
+    std::vector<int64_t> read_num_values;
+    std::vector<std::optional<int32_t>> read_num_rows;
+    auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+      DCHECK_NE(stats.encoded_statistics, nullptr);

Review Comment:
   ASSERT_NE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049256686


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;

Review Comment:
   Done, I added checks for stats.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1343382022

   Something like `static std::shared_ptr Make(const ColumnDescriptor*, const EncodedStatistics*)` would be more convenient.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1347445888

   > That doesn't prevent us from exposing a convenience API to get a `Statistics` from `EncodedStatistics`
   
   @pitrou, I added a helper function, can you take a look? Is this what you were thinking?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1048052988


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        is_stats_set(encoded_statistics->is_set()),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  EncodedStatistics* encoded_statistics;

Review Comment:
   sorry if I missed it but if we aren't applying business logic to determine when statistics might not be valid, we should clearly document this and how users are expected to verify if they are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1342866775

   > The caller will have access to the descriptor and can make the Statistics out of encoded statistics.
   
   Can we add a convenience API for that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035216467


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -486,9 +525,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
           header.repetition_levels_byte_length, uncompressed_len, is_compressed,
           page_statistics);
     } else {
-      // We don't know what this page type is. We're allowed to skip non-data
-      // pages.
-      continue;
+      // We have already skipped non-data pages in ShouldSkipPage().

Review Comment:
   nit: consider moving this detail into the exception, just in case there is a bug this will help people narrow it down quicker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030835177


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it
+  // should return or skip and move to the next page. If the callback function returns
+  // true the page must be skipped. The callback will be called only if the page
+  // type is DATA_PAGE or DATA_PAGE_V2. Dictionary pages must be read
+  // regardless.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    skip_page_callback_ = skip_page_callback;

Review Comment:
   nit: std::move ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035363944


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -337,6 +348,50 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
   }
 }
 
+bool SerializedPageReader::ShouldSkipPage(EncodedStatistics& page_statistics) {
+  const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+  if (page_type == PageType::DATA_PAGE) {
+    const format::DataPageHeader& header = current_page_header_.data_page_header;
+    CheckNumValuesInHeader(header.num_values);
+    page_statistics = ExtractStatsFromHeader(header);

Review Comment:
   ColumnChunkMetadata has is_stats_set(), which should be used to check if the statistics are correct, and if yes, set a callback function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035378763


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -263,6 +269,11 @@ class SerializedPageReader : public PageReader {
                                              int compressed_len, int uncompressed_len,
                                              int levels_byte_len = 0);
 
+  // Returns true for non-data pages, and if we should skip based on
+  // skip_page_callback_. Performs basic checks on values in the page header.
+  // Fills in page_statistics.
+  bool ShouldSkipPage(EncodedStatistics& page_statistics);

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -486,9 +525,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
           header.repetition_levels_byte_length, uncompressed_len, is_compressed,
           page_statistics);
     } else {
-      // We don't know what this page type is. We're allowed to skip non-data
-      // pages.
-      continue;
+      // We have already skipped non-data pages in ShouldSkipPage().

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1017306358


##########
cpp/src/parquet/column_reader.h:
##########
@@ -28,6 +28,7 @@
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
+#include "parquet/thrift_internal.h"  // IWYU pragma: keep

Review Comment:
   This might fall the CI check due to exposing thrift symbols.



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -253,6 +253,13 @@ class SerializedPageReader : public PageReader {
 
   void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
 
+  bool set_skip_page_callback(
+      std::function<bool(const format::PageHeader&)> skip_page_callback) override {
+    skip_page_callback_ = skip_page_callback;
+    has_skip_page_callback_ = true;

Review Comment:
   We can avoid the has_skip_page_callback_ flag by simply checking `if (skip_page_callback_)`
   
   https://en.cppreference.com/w/cpp/utility/functional/function/operator_bool



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +397,16 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+   // Once we have the header, we will call the skip_page_call_back_ to
+   // determine if we should be skipping this page. If yes, we will advance the
+   // stream to the next page.
+   if(has_skip_page_callback_) {

Review Comment:
   What we have done is to let the PageReader be aware of **Offset Index** belong to the pages of the RowGroup. In this way, it can be smart enough to move among pages and handle the skip logic. The prerequisite is https://issues.apache.org/jira/browse/ARROW-10158. If that sounds good, I can pick up ARROW-10158 to contribute our implementation. WDYT? @fatemehp @emkornfield @pitrou 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1026994392


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;

Review Comment:
   I changed this part to use encoded stats. Please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1026994254


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,6 +116,17 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // @returns: true if the skip callback was successfully set. Returns false
+  // if the callback was not set or not supported.
+  // If supported, NextPage() will use this callback to determine if it should

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1347584529

   @emkornfield, @pitrou, @wgtmac,@westonpace I attempted to address your comments, please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1346117599

   > After looking into it more, Encoded statistics is not directly convertible to statistics because it is missing the number of not-null values.
   
   That doesn't prevent us from exposing a convenience API to get a `Statistics` from `EncodedStatistics`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049255563


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        is_stats_set(encoded_statistics->is_set()),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  EncodedStatistics* encoded_statistics;
+  // False if there were no encoded statistics in the page header.
+  bool is_stats_set;

Review Comment:
   Good point. I removed is_stats_set, and added a comment that encoded_statistics would be nullptr in that case. Also, added a test for it.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +141,27 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If data_page_filter_ is present (not null), NextPage() will call the

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1048051616


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        is_stats_set(encoded_statistics->is_set()),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  EncodedStatistics* encoded_statistics;

Review Comment:
   lets clarify when this might be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield merged pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield merged PR #14603:
URL: https://github.com/apache/arrow/pull/14603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1065130195


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,305 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+  void WritePageWithoutStats();
+  void CheckNumRows(std::optional<int32_t> num_rows, const T& header);
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A" + std::to_string(i));
+    this->data_page_header_.statistics.__set_max_value("Z" + std::to_string(i));
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.statistics.__set_distinct_count(num_rows);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A" + std::to_string(i));
+    this->data_page_header_v2_.statistics.__set_max_value("Z" + std::to_string(i));
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.statistics.__set_distinct_count(num_rows);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WritePageWithoutStats() {
+  int32_t num_rows = 100;
+  total_rows_ += num_rows;
+  int data_size = 1024;
+  this->data_page_header_.__set_num_values(num_rows);
+  ASSERT_NO_FATAL_FAILURE(
+      this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  data_page_headers_.push_back(this->data_page_header_);
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WritePageWithoutStats() {
+  int32_t num_rows = 100;
+  total_rows_ += num_rows;
+  int data_size = 1024;
+  this->data_page_header_v2_.__set_num_values(num_rows);
+  this->data_page_header_v2_.__set_num_rows(num_rows);
+  ASSERT_NO_FATAL_FAILURE(
+      this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+  data_page_headers_.push_back(this->data_page_header_v2_);
+  std::vector<uint8_t> faux_data(data_size);
+  ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeader>::CheckNumRows(
+    std::optional<int32_t> num_rows, const format::DataPageHeader& header) {
+  ASSERT_EQ(num_rows, std::nullopt);
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::CheckNumRows(
+    std::optional<int32_t> num_rows, const format::DataPageHeaderV2& header) {
+  ASSERT_EQ(*num_rows, header.num_rows);
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Test that the returned encoded_statistics is nullptr when there are no
+// statistics in the page header.
+TYPED_TEST(PageFilterTest, TestPageWithoutStatistics) {
+  this->WritePageWithoutStats();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  int num_pages = 0;
+  bool is_stats_null = false;
+  auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+    is_stats_null = stats.encoded_statistics == nullptr;
+    ++num_pages;
+    return false;
+  };
+
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_EQ(num_pages, 1);
+  ASSERT_EQ(is_stats_null, true);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+     // Also check that the encoded statistics passed to the callback function
+     // are right.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    std::vector<EncodedStatistics> read_stats;
+    std::vector<int64_t> read_num_values;
+    std::vector<std::optional<int32_t>> read_num_rows;
+    auto read_all_pages = [&](const DataPageStats& stats) -> bool {
+      DCHECK_NE(stats.encoded_statistics, nullptr);

Review Comment:
   Cannot use ASSERT inside the lambda function definition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1344674452

   > Actually, nevermind. As long as we have the number of rows (which I think requires v2?) we can just have our callback record the skipped ranges (e.g. skipped from 50-60). That should be sufficient.
   
   Yes, we can filter based on this callback for v1 non-repeated fields, for which the number of values equals the number of rows, and v2 fields.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1041328869


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {

Review Comment:
   Done.



##########
cpp/src/parquet/metadata.cc:
##########
@@ -19,10 +19,13 @@
 
 #include <algorithm>
 #include <cinttypes>
+#include <csignal>
+#include <iostream>
 #include <ostream>
 #include <string>
 #include <string_view>
 #include <utility>
+#include <variant>

Review Comment:
   Done. I removed the code that used this include but forgot removing the include.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {

Review Comment:
   Done.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    if (skip_page_callback_) {
+      throw ParquetException("set_skip_page_callback was called more than once");

Review Comment:
   I was thinking mostly about why would we allow this? By forbidding it, we have one less scenario to think of in the code.



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +441,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    EncodedStatistics page_statistics;

Review Comment:
   Done.



##########
cpp/src/parquet/metadata.h:
##########
@@ -20,13 +20,15 @@
 #include <cstdint>
 #include <map>
 #include <memory>
+#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"

Review Comment:
   Done.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or

Review Comment:
   Done.



##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035207103


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      /*num_rows=*/std::nullopt);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      if (header.definition_levels_byte_length < 0 ||
+          header.repetition_levels_byte_length < 0) {
+        throw ParquetException("Invalid page header (negative levels byte length)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      header.num_rows);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DICTIONARY_PAGE) {
+      const format::DictionaryPageHeader& dict_header =
+          current_page_header_.dictionary_page_header;
+      if (dict_header.num_values < 0) {

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      /*num_rows=*/std::nullopt);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      if (header.num_values < 0) {

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      /*num_rows=*/std::nullopt);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      if (header.definition_levels_byte_length < 0 ||
+          header.repetition_levels_byte_length < 0) {
+        throw ParquetException("Invalid page header (negative levels byte length)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      header.num_rows);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1063679459


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,27 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  // Nullptr if there are no statistics in the page header.
+  const EncodedStatistics* encoded_statistics;
+  // Number of values stored in the page. Filled for both V1 and V2 data pages.
+  // For repeated fields, this can be greater than number of rows. For
+  // non-repeated fields, this will be the same as the number of rows.
+  int32_t num_values;
+  // Number of rows stored in the page. std::nullopt for V1 data pages since

Review Comment:
   maybe rephrase, is that this might not be available for v1 data pages.  It still seems like it is possible to set if page indexes are written for v1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1063684080


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(skip_even_pages);
+
+    for (int i = 0; i < this->kNumPages; ++i) {
+      // Only pages with odd number of values are read.
+      if (i % 2 != 0) {
+        std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+        ASSERT_NE(current_page, nullptr);
+        ASSERT_NO_FATAL_FAILURE(
+            CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      }
+    }
+    // We should have exhausted reading the pages by reading the odd pages only.
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+  this->WriteStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  // This callback will always return false.
+  auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_NO_FATAL_FAILURE(
+      CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+  // This callback will skip all pages.
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+  this->page_reader_->set_data_page_filter(skip_all_pages);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  // Try to read it back while asking for all data pages to be skipped.
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+  page_reader_->set_data_page_filter(skip_all_pages);

Review Comment:
   i think it would probably OK to do this as a follow up though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1343838780

   > We could end up with something like 70 rows for column0 (because we skipped 3 pages) and 100 rows for column1 and column2. However, we would not know which rows to drop from column1 and column2 (the first 30? the last 30? somewhere in the middle?)
   
   Actually, nevermind.  As long as we have the number of rows (which I think requires v2?) we can just have our callback record the skipped ranges (e.g. skipped from 50-60).  That should be sufficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1044237366


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and

Review Comment:
   The comment seems inaccurate since it does not have access to all fields in the thrift page header.



##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(skip_even_pages);
+
+    for (int i = 0; i < this->kNumPages; ++i) {
+      // Only pages with odd number of values are read.
+      if (i % 2 != 0) {
+        std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+        ASSERT_NE(current_page, nullptr);
+        ASSERT_NO_FATAL_FAILURE(
+            CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      }
+    }
+    // We should have exhausted reading the pages by reading the odd pages only.
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+  this->WriteStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  // This callback will always return false.
+  auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_NO_FATAL_FAILURE(
+      CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+  // This callback will skip all pages.
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+  this->page_reader_->set_data_page_filter(skip_all_pages);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  // Try to read it back while asking for all data pages to be skipped.
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+  page_reader_->set_data_page_filter(skip_all_pages);

Review Comment:
   Do you plan to add an e2e case to implement a callback based on the stats?



##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {

Review Comment:
   Simply use a struct and remove the ctor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049255792


##########
cpp/src/parquet/statistics.h:
##########
@@ -216,6 +216,13 @@ class PARQUET_EXPORT Statistics {
       bool has_distinct_count,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
+  // Helper function to convert EncodedStatistics to Statistics.
+  // Note that num_values will be set to -1 because number of non-null values in
+  // the column is not available in EncodedStatistics.
+  static std::shared_ptr<Statistics> Make(
+      const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1017817922


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,6 +116,17 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // @returns: true if the skip callback was successfully set. Returns false
+  // if the callback was not set or not supported.
+  // If supported, NextPage() will use this callback to determine if it should

Review Comment:
   Are there contexts where this wouldn't be supported? Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1309424951

   This pull request intends to support skipping pages using the metadata. This is especially useful when page indexes are NOT present. Even so, We still need to think through the case that the page index is present.
   
   Here is what I think. When we have a page index, we will probably have a separate API to skip to a page at a particular offset. If a callback is defined, it would take effect via the NextPage calls. So we skip to a page at an offset, and when we call NextPage, it will return the next eligible page.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1312244737

   @pitrou, @emkornfield I have updated this pull request and added more tests. Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341019657

   @westonpace @jonkeane This seems closely related to [ARROW-13998](https://issues.apache.org/jira/browse/ARROW-13998) ("[C++] Add page skipping to parquet reading"), could you take a cursory look at the proposed functionality?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341385835

   > @pitrou could you elaborate on your comment about EncodedStatistics vs. Statistics? I used EncodedStatistics because that was the easiest form that was available from the page header.
   
   Right, but that means the person implementing the callback must do the decoding themselves?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1046384193


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {

Review Comment:
   I made it a struct, though kept the ctor for convenience of making an instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025882397


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +395,28 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    const bool is_data_page =
+        page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2;
+
+    // Once we have the header, we will call the skip_page_call_back_ to
+    // determine if we should be skipping this page. If yes, we will advance the
+    // stream to the next page.
+    if (skip_page_callback_ && is_data_page) {
+      std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
+      if (page_type == PageType::DATA_PAGE) {
+        data_page_header = current_page_header_.data_page_header;
+      } else {
+        data_page_header = current_page_header_.data_page_header_v2;
+      }
+      std::unique_ptr<DataPageStats> data_page_stats =
+          DataPageStats::Make(&data_page_header);
+      if (skip_page_callback_(data_page_stats.get())) {
+        PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+        return NextPage();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035206696


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,32 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats stores statistics about a data page including number of
+// values and rows.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  explicit DataPageStats(EncodedStatistics encoded_statistics, int32_t num_values,
+                             std::optional<int32_t> num_rows)
+      : encoded_statistics_(std::move(encoded_statistics)),
+        num_values_(num_values),
+        num_rows_(num_rows) {}
+
+  const EncodedStatistics& statistics() { return encoded_statistics_;}
+
+  int32_t num_values() const {
+    return num_values_;
+  }
+
+  std::optional<int32_t> num_rows() const {
+    return num_rows_;
+  }
+
+ private:
+  const EncodedStatistics encoded_statistics_;

Review Comment:
   Done.



##########
cpp/src/parquet/metadata.cc:
##########
@@ -38,6 +43,8 @@
 
 namespace parquet {
 
+typedef std::variant<format::DataPageHeader, format::DataPageHeaderV2> DataPageHeader;

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it
+  // should return or skip and move to the next page. If the callback function returns
+  // true the page must be skipped. The callback will be called only if the page
+  // type is DATA_PAGE or DATA_PAGE_V2. Dictionary pages must be read
+  // regardless.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    skip_page_callback_ = skip_page_callback;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030944057


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -253,6 +253,13 @@ class SerializedPageReader : public PageReader {
 
   void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
 
+  bool set_skip_page_callback(
+      std::function<bool(const format::PageHeader&)> skip_page_callback) override {
+    skip_page_callback_ = skip_page_callback;
+    has_skip_page_callback_ = true;

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.h:
##########
@@ -28,6 +28,7 @@
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
+#include "parquet/thrift_internal.h"  // IWYU pragma: keep

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] wgtmac commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
wgtmac commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1018579993


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +397,16 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+   // Once we have the header, we will call the skip_page_call_back_ to
+   // determine if we should be skipping this page. If yes, we will advance the
+   // stream to the next page.
+   if(has_skip_page_callback_) {

Review Comment:
   > > What we have done is to let the PageReader be aware of Offset Index belong to the pages of the RowGroup
   > 
   > Do you have example code here for what you mean?
   > 
   > > I can pick up [ARROW-10158](https://issues.apache.org/jira/browse/ARROW-10158) to contribute our implementation.
   > 
   > Is the implementation compatible with the callback approach? If you are willing to contribute it, it seems like it would be valuable.
   
   I agree with what @fatemehp has said. My proposal is orthogonal (and compatible) to the current patch. I can contribute the Page Index implementation and then utilize the Offset Index to skip when available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1020618816


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;
+  int32_t num_rows() const;

Review Comment:
   how is the consumer supposed to know if this is exact?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1042527520


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    if (skip_page_callback_) {
+      throw ParquetException("set_skip_page_callback was called more than once");

Review Comment:
   Yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1344740899

   > for which the number of values equals the number of rows
   
   This is great, and valuable information that might not be obvious to a parquet novice.  Can you add this fact to the comments for this addition somewhere?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049253901


##########
cpp/src/parquet/metadata.h:
##########
@@ -20,13 +20,15 @@
 #include <cstdint>
 #include <map>
 #include <memory>
+#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"

Review Comment:
   Oops, not sure what happened there. It should be removed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1046383745


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1377901380

   > Apparently you requested some changes? Feel free to merge when you feel everything was addressed.
   
   Same here, I am content


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1381086143

   Travis CI failures look infrastructure related.  going to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1020617977


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,6 +116,17 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // @returns: true if the skip callback was successfully set. Returns false
+  // if the callback was not set or not supported.
+  // If supported, NextPage() will use this callback to determine if it should

Review Comment:
   if we don't have a failure case here in mind, then lets either make the method void (YAGNI) if we think there is a possible error, then Status should be returned.  Similar question is does this need to be virtual?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1020618734


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;

Review Comment:
   There is already a notion of statistics (see statistics.h), is there a reason to not use that as a member here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1018341625


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +397,16 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+   // Once we have the header, we will call the skip_page_call_back_ to
+   // determine if we should be skipping this page. If yes, we will advance the
+   // stream to the next page.
+   if(has_skip_page_callback_) {

Review Comment:
   Thanks @wgtmac. With this pull request, we would like to specifically support the case where the Page Index is NOT available. We still want to be able to skip pages based on the metadata in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: [PARQUET-2210]:[cpp] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1306231073

   @emkornfield, @pitrou could you take a look at this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030838076


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,32 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats stores statistics about a data page including number of
+// values and rows.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  explicit DataPageStats(EncodedStatistics encoded_statistics, int32_t num_values,
+                             std::optional<int32_t> num_rows)
+      : encoded_statistics_(std::move(encoded_statistics)),
+        num_values_(num_values),
+        num_rows_(num_rows) {}
+
+  const EncodedStatistics& statistics() { return encoded_statistics_;}
+
+  int32_t num_values() const {
+    return num_values_;
+  }
+
+  std::optional<int32_t> num_rows() const {
+    return num_rows_;
+  }
+
+ private:
+  const EncodedStatistics encoded_statistics_;

Review Comment:
   this is safe but it seems possible to maybe avoid the copy here, since stats need to use later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049249236


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or

Review Comment:
   I feel like since it is a filter, it should return true if it is filtering? But I am not opposed to the other way around, especially if there is a convention for how it should be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035213163


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -263,6 +269,11 @@ class SerializedPageReader : public PageReader {
                                              int compressed_len, int uncompressed_len,
                                              int levels_byte_len = 0);
 
+  // Returns true for non-data pages, and if we should skip based on
+  // skip_page_callback_. Performs basic checks on values in the page header.
+  // Fills in page_statistics.
+  bool ShouldSkipPage(EncodedStatistics& page_statistics);

Review Comment:
   nit: Style in Arrow is to still use pointers for output parameters



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1358042954

   @pitrou could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025828815


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;

Review Comment:
   EncodedStatistics looks almost identical to this though?  For normal statistics It looks the like the descriptor is only needed to construct a comparator, I wonder if there is some refactoring that could separate the two?  It seems like having a 3rd type of statitistics would add confusion, so I want to make sure there is a strong reason to add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025829317


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;
+  int32_t num_rows() const;

Review Comment:
   Sorry, this throws an exception if isn't data table V2, the user has no way of knowing that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341486243

   Good point. However, in the page reader we do not have the column descriptor which is required to make the statistics. The caller will have access to the descriptor and can make the Statistics out of encoded statistics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1026994492


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;
+  int32_t num_rows() const;

Review Comment:
   num_rows returns an optional now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025883534


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;

Review Comment:
   Yes, we could use EncodedStatistics, it is missing num_rows and num_values though. Should we refactor EncodedStatistics to meet our needs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025775059


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;

Review Comment:
   The statistics has more functionality/dependency than we need. It is meant for accumulating values and computing the min/max. Also, we need a column descriptor to make it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025775729


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;
+  int32_t num_rows() const;

Review Comment:
   Could you please elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1065129943


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,27 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  // Nullptr if there are no statistics in the page header.
+  const EncodedStatistics* encoded_statistics;
+  // Number of values stored in the page. Filled for both V1 and V2 data pages.
+  // For repeated fields, this can be greater than number of rows. For
+  // non-repeated fields, this will be the same as the number of rows.
+  int32_t num_values;
+  // Number of rows stored in the page. std::nullopt for V1 data pages since

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1065132773


##########
cpp/src/parquet/statistics.h:
##########
@@ -216,6 +216,14 @@ class PARQUET_EXPORT Statistics {
       bool has_distinct_count,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
+  // Helper function to convert EncodedStatistics to Statistics.
+  // EncodedStatistics does not contain number of non-null values, and it can be
+  // passed using the num_values parameter.
+  static std::shared_ptr<Statistics> Make(
+      const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
+      int64_t num_values = -1,

Review Comment:
   We decided to add a convenient way to convert an EncodedStatistics to Statistics. However, the encoded statistics misses the number of non-null values that is required for making Statistics. So, I made an optional argument for it.
   
   Please see here: https://github.com/apache/arrow/pull/14603#discussion_r1048484566



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14603: [PARQUET-2210]:[cpp] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1306231052

   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/master/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on JIRA? https://issues.apache.org/jira/browse/ARROW
   
   Opening JIRAs ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename pull request title in the following format?
   
       ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030829339


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      /*num_rows=*/std::nullopt);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      if (header.definition_levels_byte_length < 0 ||
+          header.repetition_levels_byte_length < 0) {
+        throw ParquetException("Invalid page header (negative levels byte length)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      header.num_rows);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DICTIONARY_PAGE) {
+      const format::DictionaryPageHeader& dict_header =
+          current_page_header_.dictionary_page_header;
+      if (dict_header.num_values < 0) {

Review Comment:
   since we are refactoring consider making this check a utility method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1042229280


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    if (skip_page_callback_) {
+      throw ParquetException("set_skip_page_callback was called more than once");

Review Comment:
   Well, unless we do a similar thing for other configuration properties, this seems a bit gratuitous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341384233

   @pitrou could you elaborate on your comment about EncodedStatistics vs. Statistics? I used EncodedStatistics because that was the easiest form that was available from the page header.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341419900

   Yes, I think so. As far as I can tell, Statistics does not offer more than EncodedStatistics, both of them return min and max as a string.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1043992258


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  EncodedStatistics* encoded_statistics;
+  const int32_t num_values;
+  const std::optional<int32_t> num_rows;

Review Comment:
   Can we add a comment here explaining why this is optional?  From the code I can see that it is not included if the data page is a v1 data page but that might not be obvious.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +133,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If data_page_filter_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL

Review Comment:
   Can we add some description of what happens when the file does not contain page statistics (e.g. because the writer did not write them)?  Will the filter simply not be called?  Or will it be called with some kind of "empty" statistics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1343745564

   Also, whatever we end up doing, we can leave ARROW-13998 open for adding page filtering to `ParquetFileReader`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035378900


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +184,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+// \brief DataPageStats stores statistics about a data page including number of
+// values and rows.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  explicit DataPageStats(const EncodedStatistics& encoded_statistics, int32_t num_values,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #14603: [PARQUET-2210]:[cpp] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1307760526

   I think to avoid exposing thrift headers we need an alternate representation of stats.  Please take a look at how RowGroup stats are exposed and re-use or model the necessary structures off of that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025882564


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;
+  int32_t num_rows() const;

Review Comment:
   I think we should expose is_v2_ as well. I will do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1026709732


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;

Review Comment:
   I have a slightly preference for composing encoded stats into this new object, rather than refactoring.  Do you have a preference?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030835002


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it
+  // should return or skip and move to the next page. If the callback function returns

Review Comment:
   3.  set_skip_page_callback is expected to be called at most once?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030836947


##########
cpp/src/parquet/metadata.cc:
##########
@@ -38,6 +43,8 @@
 
 namespace parquet {
 
+typedef std::variant<format::DataPageHeader, format::DataPageHeaderV2> DataPageHeader;

Review Comment:
   unused now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035219832


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -337,6 +348,50 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
   }
 }
 
+bool SerializedPageReader::ShouldSkipPage(EncodedStatistics& page_statistics) {
+  const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+  if (page_type == PageType::DATA_PAGE) {
+    const format::DataPageHeader& header = current_page_header_.data_page_header;
+    CheckNumValuesInHeader(header.num_values);
+    page_statistics = ExtractStatsFromHeader(header);

Review Comment:
   Sorry I can't find the comment, but do we need some way of filtering out known invalid statistics here, or is it already taken care of?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1331201162

   @emkornfield I attempted to address your comments. Please take another look. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1347447247

   > This is great, and valuable information that might not be obvious to a parquet novice. Can you add this fact to the comments for this addition somewhere?
   
   Done. Added to the DataPageStats comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1046384670


##########
cpp/src/parquet/metadata.h:
##########
@@ -182,6 +182,28 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
 };
 
+/// \brief DataPageStats is a proxy around stats in format::PageHeader.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  static std::unique_ptr<DataPageStats> Make(const void* page_header);
+
+  ~DataPageStats();
+
+  bool Equals(const DataPageStats& other) const;
+
+  int32_t num_values() const;
+  int32_t num_rows() const;

Review Comment:
   > This is great, and valuable information that might not be obvious to a parquet novice. Can you add this fact to the comments for this addition somewhere?
   
   I added this to the comments in the DataPageStats class. please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1048052298


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +141,27 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If data_page_filter_ is present (not null), NextPage() will call the

Review Comment:
   ```suggestion
     // If data_page_filter is present (not null), NextPage() will call the
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1382710006

   Benchmark runs are scheduled for baseline = 37a79659f82a7dbe0a55858e59fa5eaa380a266f and contender = 97998d835f404dd4876a2691a93b973fc022ffd3. 97998d835f404dd4876a2691a93b973fc022ffd3 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/22bd9ce77c7c43699c408bd2c18c5de5...fd3383ba44854d21b8cb8cf2b0da25a0/)
   [Finished :arrow_down:0.33% :arrow_up:0.21%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/b90c365d6d154909a3a5ec7570fc4852...b2eba52569394239b6111aa37558ccff/)
   [Finished :arrow_down:0.77% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/7068ec714a5442e6af4a7b5eeada509e...1aec227b7356471bbf449ab43fee74b6/)
   [Finished :arrow_down:1.06% :arrow_up:0.06%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/e10267722cc747be9a6b46984d8a03f7...bd67fc2286be486bb55fee7a5a919cd5/)
   Buildkite builds:
   [Finished] [`97998d83` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2172)
   [Finished] [`97998d83` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2196)
   [Finished] [`97998d83` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2168)
   [Finished] [`97998d83` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2189)
   [Finished] [`37a79659` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2171)
   [Finished] [`37a79659` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2194)
   [Finished] [`37a79659` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2167)
   [Finished] [`37a79659` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2188)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1063683715


##########
cpp/src/parquet/statistics.h:
##########
@@ -216,6 +216,14 @@ class PARQUET_EXPORT Statistics {
       bool has_distinct_count,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
+  // Helper function to convert EncodedStatistics to Statistics.
+  // EncodedStatistics does not contain number of non-null values, and it can be
+  // passed using the num_values parameter.
+  static std::shared_ptr<Statistics> Make(
+      const ColumnDescriptor* descr, const EncodedStatistics* encoded_statistics,
+      int64_t num_values = -1,

Review Comment:
   Not sure I understand why this is optional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1344974085

   > 
   
   After looking into it more, Encoded statistics is not directly convertible to statistics because it is missing the number of not-null values. We could add a HasNumValues() function to tell when we have that or not, but I find that it makes the Statistics class confusing.
   
   I prefer holding off on this until future pull requests in which we will actually be using such conversion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1018196450


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +397,16 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+   // Once we have the header, we will call the skip_page_call_back_ to
+   // determine if we should be skipping this page. If yes, we will advance the
+   // stream to the next page.
+   if(has_skip_page_callback_) {

Review Comment:
   > What we have done is to let the PageReader be aware of Offset Index belong to the pages of the RowGroup
   
   Do you have example code here for what you mean?
   
   > I can pick up [ARROW-10158](https://issues.apache.org/jira/browse/ARROW-10158) to contribute our implementation.
   
   Is the implementation compatible with the callback approach?  If you are willing to contribute it, it seems like it would be valuable.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1343341106

   If I understand correctly, there is already one in the Statistics class:
   
   static std::shared_ptr<Statistics> Make(
         const ColumnDescriptor* descr, const std::string& encoded_min,
         const std::string& encoded_max, int64_t num_values, int64_t null_count,
         int64_t distinct_count, bool has_min_max, bool has_null_count,
         bool has_distinct_count,
         ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1042552865


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    if (skip_page_callback_) {
+      throw ParquetException("set_skip_page_callback was called more than once");

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1307835675

   https://issues.apache.org/jira/browse/PARQUET-2210


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1018343643


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,6 +116,17 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // @returns: true if the skip callback was successfully set. Returns false
+  // if the callback was not set or not supported.
+  // If supported, NextPage() will use this callback to determine if it should

Review Comment:
   I do not have a concrete example. Since this is an abstract class, I tried to make it a more general case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1020620357


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +395,28 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    const bool is_data_page =
+        page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2;
+
+    // Once we have the header, we will call the skip_page_call_back_ to
+    // determine if we should be skipping this page. If yes, we will advance the
+    // stream to the next page.
+    if (skip_page_callback_ && is_data_page) {
+      std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
+      if (page_type == PageType::DATA_PAGE) {
+        data_page_header = current_page_header_.data_page_header;
+      } else {
+        data_page_header = current_page_header_.data_page_header_v2;
+      }
+      std::unique_ptr<DataPageStats> data_page_stats =
+          DataPageStats::Make(&data_page_header);
+      if (skip_page_callback_(data_page_stats.get())) {
+        PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+        return NextPage();

Review Comment:
   does it pay to refactor this a little bit so we can use iteration instead of recursion?  (I think it would be good to avoid stack overflow for poorly constructed files that we filter out most pages for)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1025831879


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +395,28 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    const bool is_data_page =
+        page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2;
+
+    // Once we have the header, we will call the skip_page_call_back_ to
+    // determine if we should be skipping this page. If yes, we will advance the
+    // stream to the next page.
+    if (skip_page_callback_ && is_data_page) {
+      std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
+      if (page_type == PageType::DATA_PAGE) {
+        data_page_header = current_page_header_.data_page_header;
+      } else {
+        data_page_header = current_page_header_.data_page_header_v2;
+      }
+      std::unique_ptr<DataPageStats> data_page_stats =

Review Comment:
   one other thing to consider is whther statistics are correct: https://github.com/apache/arrow/blob/147b5c922efe19d34ef7e7cda635b7d8a07be2eb/cpp/src/parquet/metadata.cc#L1325



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1035206919


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is set, NextPage() must use this callback to determine if it
+  // should return or skip and move to the next page. If the callback function returns

Review Comment:
   Done



##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1331225476

   A few nits but mostly looks good to me.  I still have a concern about applying the same logic to page statistics that we do to row group statistics to make sure we aren't passing through invalid values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1375861398

   > LGTM from me. @pitrou @westonpace @wgtmac any more concerns?
   
   Apparently you requested some changes? Feel free to merge when you feel everything was addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341005157

   I have a more general API question: why is this exposing `EncodedStatistics` rather than decoded `Statistics`? There is no easy way to turn the former into the latter, AFAICT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1042526855


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +116,30 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If skip_page_callback_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL
+  void set_skip_page_callback(
+      std::function<bool(const DataPageStats&)> skip_page_callback) {
+    if (skip_page_callback_) {
+      throw ParquetException("set_skip_page_callback was called more than once");

Review Comment:
   @pitrou, so do you suggest allowing it to be called more than once? I am fine with that, just want to be clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on PR #14603:
URL: https://github.com/apache/arrow/pull/14603#issuecomment-1341371012

   To me https://issues.apache.org/jira/browse/ARROW-13998 and https://issues.apache.org/jira/browse/PARQUET-2210 look the same with the latter being a bit more detailed in the approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049244693


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +395,28 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    const bool is_data_page =
+        page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2;
+
+    // Once we have the header, we will call the skip_page_call_back_ to
+    // determine if we should be skipping this page. If yes, we will advance the
+    // stream to the next page.
+    if (skip_page_callback_ && is_data_page) {
+      std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
+      if (page_type == PageType::DATA_PAGE) {
+        data_page_header = current_page_header_.data_page_header;
+      } else {
+        data_page_header = current_page_header_.data_page_header_v2;
+      }
+      std::unique_ptr<DataPageStats> data_page_stats =

Review Comment:
   I think the caller should check that the stats are valid before setting a callback function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049244905


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +395,28 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    const bool is_data_page =
+        page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2;
+
+    // Once we have the header, we will call the skip_page_call_back_ to
+    // determine if we should be skipping this page. If yes, we will advance the
+    // stream to the next page.
+    if (skip_page_callback_ && is_data_page) {
+      std::variant<format::DataPageHeader, format::DataPageHeaderV2> data_page_header;
+      if (page_type == PageType::DATA_PAGE) {
+        data_page_header = current_page_header_.data_page_header;
+      } else {
+        data_page_header = current_page_header_.data_page_header_v2;
+      }
+      std::unique_ptr<DataPageStats> data_page_stats =

Review Comment:
   I made that clear in the comments for setting the callback function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049250913


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +133,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If data_page_filter_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL

Review Comment:
   Ok, I removed the is_stats_set, it will return nullptr for encoded_statistics if there are no statistics in the header. I added that to the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1049255910


##########
cpp/src/parquet/metadata.cc:
##########
@@ -19,6 +19,8 @@
 
 #include <algorithm>
 #include <cinttypes>
+#include <csignal>
+#include <iostream>

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1048051811


##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,29 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats stores encoded statistics and number of values/rows for
+// a page.
+struct PARQUET_EXPORT DataPageStats {
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        is_stats_set(encoded_statistics->is_set()),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  // Encoded statistics extracted from the page header.
+  EncodedStatistics* encoded_statistics;
+  // False if there were no encoded statistics in the page header.
+  bool is_stats_set;

Review Comment:
   would encoded_stats just be null here?  are both needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030827605


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      /*num_rows=*/std::nullopt);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      if (header.definition_levels_byte_length < 0 ||
+          header.repetition_levels_byte_length < 0) {
+        throw ParquetException("Invalid page header (negative levels byte length)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      header.num_rows);

Review Comment:
   should num_rows be validated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030828466


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);
+      seen_num_values_ += header.num_values;
+      if (skip_page_callback_) {
+        DataPageStats data_page_stats(page_statistics, header.num_values,
+                                      /*num_rows=*/std::nullopt);
+        if (skip_page_callback_(data_page_stats)) {
+          PARQUET_THROW_NOT_OK(stream_->Advance(compressed_len));
+          continue;
+        }
+      }
+    } else if (page_type == PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      if (header.num_values < 0) {

Review Comment:
   does num_rows have to be validated as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030831187


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);

Review Comment:
   Oh, I see it is needed for whatever type of page is current.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] emkornfield commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
emkornfield commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1030826876


##########
cpp/src/parquet/column_reader.cc:
##########
@@ -386,6 +386,57 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       throw ParquetException("Invalid page header");
     }
 
+    // Do some checks before trying to decrypt and/or decompress the page.
+    // Also skip the page if skip_page_callback_ is set and returns true.
+    const PageType::type page_type = LoadEnumSafe(&current_page_header_.type);
+    EncodedStatistics page_statistics;
+    if (page_type == PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+      if (header.num_values < 0) {
+        throw ParquetException("Invalid page header (negative number of values)");
+      }
+      page_statistics = ExtractStatsFromHeader(header);

Review Comment:
   maybe just move this to the usage spots.  At the very least, we should probably only do the conversion if skip_page_callback_ is present?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1046443578


##########
cpp/src/parquet/file_deserialize_test.cc:
##########
@@ -177,6 +201,217 @@ TEST_F(TestPageSerde, DataPageV1) {
   ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get()));
 }
 
+// Templated test class to test page filtering for both format::DataPageHeader
+// and format::DataPageHeaderV2.
+template <typename T>
+class PageFilterTest : public TestPageSerde {
+ public:
+  const int kNumPages = 10;
+  void WriteStream();
+
+ protected:
+  std::vector<T> data_page_headers_;
+  int total_rows_ = 0;
+};
+
+template <>
+void PageFilterTest<format::DataPageHeader>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_.__set_num_values(num_rows);
+    this->data_page_header_.statistics.__set_min_value("A");
+    this->data_page_header_.statistics.__set_max_value("Z");
+    this->data_page_header_.statistics.__set_null_count(0);
+    this->data_page_header_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+template <>
+void PageFilterTest<format::DataPageHeaderV2>::WriteStream() {
+  for (int i = 0; i < kNumPages; ++i) {
+    // Vary the number of rows to produce different headers.
+    int32_t num_rows = i + 100;
+    total_rows_ += num_rows;
+    int data_size = i + 1024;
+    this->data_page_header_v2_.__set_num_values(num_rows);
+    this->data_page_header_v2_.__set_num_rows(num_rows);
+    this->data_page_header_v2_.statistics.__set_min_value("A");
+    this->data_page_header_v2_.statistics.__set_max_value("Z");
+    this->data_page_header_v2_.statistics.__set_null_count(0);
+    this->data_page_header_v2_.__isset.statistics = true;
+    ASSERT_NO_FATAL_FAILURE(
+        this->WriteDataPageHeaderV2(/*max_serialized_len=*/1024, data_size, data_size));
+    data_page_headers_.push_back(this->data_page_header_v2_);
+    // Also write data, to make sure we skip the data correctly.
+    std::vector<uint8_t> faux_data(data_size);
+    ASSERT_OK(this->out_stream_->Write(faux_data.data(), data_size));
+  }
+  this->EndStream();
+}
+
+using DataPageHeaderTypes =
+    ::testing::Types<format::DataPageHeader, format::DataPageHeaderV2>;
+TYPED_TEST_SUITE(PageFilterTest, DataPageHeaderTypes);
+
+// Creates a number of pages and skips some of them with the page filter callback.
+TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
+  this->WriteStream();
+
+  {  // Read all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // This callback will always return false.
+    auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+
+    this->page_reader_->set_data_page_filter(read_all_pages);
+    for (int i = 0; i < this->kNumPages; ++i) {
+      std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+      ASSERT_NE(current_page, nullptr);
+      ASSERT_NO_FATAL_FAILURE(
+          CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+    }
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+  {  // Skip all pages.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+    this->page_reader_->set_data_page_filter(skip_all_pages);
+    std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+
+  {  // Skip every other page.
+    auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+    this->page_reader_ =
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+    // Skip pages with even number of values.
+    auto skip_even_pages = [](const DataPageStats& stats) -> bool {
+      if (stats.num_values % 2 == 0) return true;
+      return false;
+    };
+
+    this->page_reader_->set_data_page_filter(skip_even_pages);
+
+    for (int i = 0; i < this->kNumPages; ++i) {
+      // Only pages with odd number of values are read.
+      if (i % 2 != 0) {
+        std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+        ASSERT_NE(current_page, nullptr);
+        ASSERT_NO_FATAL_FAILURE(
+            CheckDataPageHeader(this->data_page_headers_[i], current_page.get()));
+      }
+    }
+    // We should have exhausted reading the pages by reading the odd pages only.
+    ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+  }
+}
+
+// Set the page filter more than once. The new filter should be effective
+// on the next NextPage() call.
+TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
+  this->WriteStream();
+
+  auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
+  this->page_reader_ =
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+
+  // This callback will always return false.
+  auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; };
+  this->page_reader_->set_data_page_filter(read_all_pages);
+  std::shared_ptr<Page> current_page = this->page_reader_->NextPage();
+  ASSERT_NE(current_page, nullptr);
+  ASSERT_NO_FATAL_FAILURE(
+      CheckDataPageHeader(this->data_page_headers_[0], current_page.get()));
+
+  // This callback will skip all pages.
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+  this->page_reader_->set_data_page_filter(skip_all_pages);
+  ASSERT_EQ(this->page_reader_->NextPage(), nullptr);
+}
+
+// Test that we do not skip dictionary pages.
+TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
+  int data_size = 1024;
+  std::vector<uint8_t> faux_data(data_size);
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(WriteDictionaryPageHeader(data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+
+  ASSERT_NO_FATAL_FAILURE(
+      WriteDataPageHeader(/*max_serialized_len=*/1024, data_size, data_size));
+  ASSERT_OK(out_stream_->Write(faux_data.data(), data_size));
+  EndStream();
+
+  // Try to read it back while asking for all data pages to be skipped.
+  auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, Compression::UNCOMPRESSED);
+
+  auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; };
+
+  page_reader_->set_data_page_filter(skip_all_pages);

Review Comment:
   Could you please elaborate?
   
   I considered adding a test in column_reader_test, however, the test works with a MockPageReader so I am not sure how much the added test using the mock reader would be helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] fatemehp commented on a diff in pull request #14603: PARQUET-2210: [C++][Parquet] Skip pages based on header metadata using a callback

Posted by GitBox <gi...@apache.org>.
fatemehp commented on code in PR #14603:
URL: https://github.com/apache/arrow/pull/14603#discussion_r1046383355


##########
cpp/src/parquet/column_reader.h:
##########
@@ -115,11 +133,26 @@ class PARQUET_EXPORT PageReader {
                                           bool always_compressed = false,
                                           const CryptoContext* ctx = NULLPTR);
 
+  // If data_page_filter_ is present (not null), NextPage() will call the
+  // callback function exactly once per page in the order the pages appear in
+  // the column. If the callback function returns true the page will be
+  // skipped. The callback will be called only if the page type is DATA_PAGE or
+  // DATA_PAGE_V2. Dictionary pages will not be skipped.
+  // This setter must be called at most once to set the callback.
+  // \note API EXPERIMENTAL

Review Comment:
   Good point. I added a is_stats_set field that should be checked before using the filled encoded_stats.



##########
cpp/src/parquet/column_reader.h:
##########
@@ -55,6 +56,21 @@ static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
 // 16 KB is the default expected page header size
 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
 
+// \brief DataPageStats is a proxy around format::DataPageHeader and
+// format::DataPageHeaderV2.
+class PARQUET_EXPORT DataPageStats {
+ public:
+  DataPageStats(EncodedStatistics* encoded_statistics, int32_t num_values,
+                std::optional<int32_t> num_rows)
+      : encoded_statistics(encoded_statistics),
+        num_values(num_values),
+        num_rows(num_rows) {}
+
+  EncodedStatistics* encoded_statistics;
+  const int32_t num_values;
+  const std::optional<int32_t> num_rows;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org