You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/01/12 12:01:27 UTC

[arrow] branch master updated: GH-15074: [Parquet][C++] change 16-bit page_ordinal to 32-bit (#15182)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 48da0dfb6c GH-15074: [Parquet][C++] change 16-bit page_ordinal to 32-bit (#15182)
48da0dfb6c is described below

commit 48da0dfb6c0425646f6043afc41a2515e93a4ffb
Author: mwish <15...@qq.com>
AuthorDate: Thu Jan 12 20:01:19 2023 +0800

    GH-15074: [Parquet][C++] change 16-bit page_ordinal to 32-bit (#15182)
    
    As we mentioned in https://github.com/apache/arrow/issues/15074 . `int16_t page_ordinal` may causing overflow. So, we need to change it to 32-bit.
    
    * [x] Implement the logic
    * [x] Testing
      * [x] Upload a file with more than `int16_t` pages in parquet-testing.
    * Closes: #15074
    
    Lead-authored-by: mwish <ma...@gmail.com>
    Co-authored-by: Antoine Pitrou <an...@python.org>
    Co-authored-by: mwish <15...@qq.com>
    Co-authored-by: Antoine Pitrou <pi...@free.fr>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/parquet/column_reader.cc                   | 15 ++++----
 cpp/src/parquet/column_writer.cc                   |  6 +--
 cpp/src/parquet/encryption/encryption_internal.cc  | 24 ++++++++----
 cpp/src/parquet/encryption/encryption_internal.h   |  4 +-
 .../encryption/encryption_internal_nossl.cc        |  4 +-
 cpp/src/parquet/reader_test.cc                     | 44 ++++++++++++++++++++++
 cpp/submodules/parquet-testing                     |  2 +-
 7 files changed, 76 insertions(+), 23 deletions(-)

diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 9246096c95..f881651737 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -261,7 +261,7 @@ class SerializedPageReader : public PageReader {
 
  private:
   void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t module_type,
-                        const std::string& page_aad);
+                        std::string* page_aad);
 
   void InitDecryption();
 
@@ -291,7 +291,7 @@ class SerializedPageReader : public PageReader {
 
   // The ordinal fields in the context below are used for AAD suffix calculation.
   CryptoContext crypto_ctx_;
-  int16_t page_ordinal_;  // page ordinal does not count the dictionary page
+  int32_t page_ordinal_;  // page ordinal does not count the dictionary page
 
   // Maximum allowed page size
   uint32_t max_page_header_size_;
@@ -329,8 +329,7 @@ void SerializedPageReader::InitDecryption() {
 }
 
 void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor,
-                                            int8_t module_type,
-                                            const std::string& page_aad) {
+                                            int8_t module_type, std::string* page_aad) {
   DCHECK(decryptor != nullptr);
   if (crypto_ctx_.start_decrypt_with_dictionary_page) {
     std::string aad = encryption::CreateModuleAad(
@@ -338,8 +337,8 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
         crypto_ctx_.column_ordinal, kNonPageOrdinal);
     decryptor->UpdateAad(aad);
   } else {
-    encryption::QuickUpdatePageAad(page_aad, page_ordinal_);
-    decryptor->UpdateAad(page_aad);
+    encryption::QuickUpdatePageAad(page_ordinal_, page_aad);
+    decryptor->UpdateAad(*page_aad);
   }
 }
 
@@ -366,7 +365,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       try {
         if (crypto_ctx_.meta_decryptor != nullptr) {
           UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
-                           data_page_header_aad_);
+                           &data_page_header_aad_);
         }
         deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
                                         &header_size, &current_page_header_,
@@ -394,7 +393,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
 
     if (crypto_ctx_.data_decryptor != nullptr) {
       UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
-                       data_page_aad_);
+                       &data_page_aad_);
     }
 
     // Read the compressed data page.
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 7b5ca8810b..1fc13aa3c1 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -482,12 +482,12 @@ class SerializedPageWriter : public PageWriter {
         break;
       }
       case encryption::kDataPage: {
-        encryption::QuickUpdatePageAad(data_page_aad_, page_ordinal_);
+        encryption::QuickUpdatePageAad(page_ordinal_, &data_page_aad_);
         data_encryptor_->UpdateAad(data_page_aad_);
         break;
       }
       case encryption::kDataPageHeader: {
-        encryption::QuickUpdatePageAad(data_page_header_aad_, page_ordinal_);
+        encryption::QuickUpdatePageAad(page_ordinal_, &data_page_header_aad_);
         meta_encryptor_->UpdateAad(data_page_header_aad_);
         break;
       }
@@ -516,7 +516,7 @@ class SerializedPageWriter : public PageWriter {
   int64_t data_page_offset_;
   int64_t total_uncompressed_size_;
   int64_t total_compressed_size_;
-  int16_t page_ordinal_;
+  int32_t page_ordinal_;
   int16_t row_group_ordinal_;
   int16_t column_ordinal_;
 
diff --git a/cpp/src/parquet/encryption/encryption_internal.cc b/cpp/src/parquet/encryption/encryption_internal.cc
index fee3434d09..1c4d3d8dc4 100644
--- a/cpp/src/parquet/encryption/encryption_internal.cc
+++ b/cpp/src/parquet/encryption/encryption_internal.cc
@@ -597,9 +597,19 @@ static std::string ShortToBytesLe(int16_t input) {
   return std::string(reinterpret_cast<char const*>(output), 2);
 }
 
+static void CheckPageOrdinal(int32_t page_ordinal) {
+  if (ARROW_PREDICT_FALSE(page_ordinal > std::numeric_limits<int16_t>::max())) {
+    throw ParquetException("Encrypted Parquet files can't have more than " +
+                           std::to_string(std::numeric_limits<int16_t>::max()) +
+                           " pages per chunk: got " + std::to_string(page_ordinal));
+  }
+}
+
 std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
                             int16_t row_group_ordinal, int16_t column_ordinal,
-                            int16_t page_ordinal) {
+                            int32_t page_ordinal) {
+  CheckPageOrdinal(page_ordinal);
+  const int16_t page_ordinal_short = static_cast<int16_t>(page_ordinal);
   int8_t type_ordinal_bytes[1];
   type_ordinal_bytes[0] = module_type;
   std::string type_ordinal_bytes_str(reinterpret_cast<char const*>(type_ordinal_bytes),
@@ -616,7 +626,7 @@ std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
         << column_ordinal_bytes;
     return out.str();
   }
-  std::string page_ordinal_bytes = ShortToBytesLe(page_ordinal);
+  std::string page_ordinal_bytes = ShortToBytesLe(page_ordinal_short);
   std::ostringstream out;
   out << file_aad << type_ordinal_bytes_str << row_group_ordinal_bytes
       << column_ordinal_bytes << page_ordinal_bytes;
@@ -630,11 +640,11 @@ std::string CreateFooterAad(const std::string& aad_prefix_bytes) {
 
 // Update last two bytes with new page ordinal (instead of creating new page AAD
 // from scratch)
-void QuickUpdatePageAad(const std::string& AAD, int16_t new_page_ordinal) {
-  std::string page_ordinal_bytes = ShortToBytesLe(new_page_ordinal);
-  int length = static_cast<int>(AAD.size());
-  std::memcpy(reinterpret_cast<int16_t*>(const_cast<char*>(AAD.c_str() + length - 2)),
-              reinterpret_cast<const int16_t*>(page_ordinal_bytes.c_str()), 2);
+void QuickUpdatePageAad(int32_t new_page_ordinal, std::string* AAD) {
+  CheckPageOrdinal(new_page_ordinal);
+  const std::string page_ordinal_bytes =
+      ShortToBytesLe(static_cast<int16_t>(new_page_ordinal));
+  std::memcpy(AAD->data() + AAD->length() - 2, page_ordinal_bytes.data(), 2);
 }
 
 void RandBytes(unsigned char* buf, int num) { RAND_bytes(buf, num); }
diff --git a/cpp/src/parquet/encryption/encryption_internal.h b/cpp/src/parquet/encryption/encryption_internal.h
index ede338182f..24093c68be 100644
--- a/cpp/src/parquet/encryption/encryption_internal.h
+++ b/cpp/src/parquet/encryption/encryption_internal.h
@@ -119,12 +119,12 @@ class AesDecryptor {
 
 std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
                             int16_t row_group_ordinal, int16_t column_ordinal,
-                            int16_t page_ordinal);
+                            int32_t page_ordinal);
 
 std::string CreateFooterAad(const std::string& aad_prefix_bytes);
 
 // Update last two bytes of page (or page header) module AAD
-void QuickUpdatePageAad(const std::string& AAD, int16_t new_page_ordinal);
+void QuickUpdatePageAad(int32_t new_page_ordinal, std::string* AAD);
 
 // Wraps OpenSSL RAND_bytes function
 void RandBytes(unsigned char* buf, int num);
diff --git a/cpp/src/parquet/encryption/encryption_internal_nossl.cc b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
index 540ee16f78..bb203f0fd8 100644
--- a/cpp/src/parquet/encryption/encryption_internal_nossl.cc
+++ b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
@@ -99,7 +99,7 @@ int AesDecryptor::CiphertextSizeDelta() {
 
 std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
                             int16_t row_group_ordinal, int16_t column_ordinal,
-                            int16_t page_ordinal) {
+                            int32_t page_ordinal) {
   ThrowOpenSSLRequiredException();
   return "";
 }
@@ -109,7 +109,7 @@ std::string CreateFooterAad(const std::string& aad_prefix_bytes) {
   return "";
 }
 
-void QuickUpdatePageAad(const std::string& AAD, int16_t new_page_ordinal) {
+void QuickUpdatePageAad(int32_t new_page_ordinal, std::string* AAD) {
   ThrowOpenSSLRequiredException();
 }
 
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 03e6922d13..e17f7a91f9 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -86,6 +86,10 @@ std::string lz4_raw_compressed_larger() {
   return data_file("lz4_raw_compressed_larger.parquet");
 }
 
+std::string overflow_i16_page_oridinal() {
+  return data_file("overflow_i16_page_cnt.parquet");
+}
+
 // TODO: Assert on definition and repetition levels
 template <typename DType, typename ValueType>
 void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size,
@@ -1015,4 +1019,44 @@ INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec, ::testing::ValuesIn(test_code
                          testing::PrintToStringParamName());
 #endif  // ARROW_WITH_LZ4
 
+// Test reading a data file with a ColumnChunk contains more than
+// INT16_MAX pages. (GH-15074).
+TEST(TestFileReader, TestOverflowInt16PageOrdinal) {
+  ReaderProperties reader_props;
+  auto file_reader = ParquetFileReader::OpenFile(overflow_i16_page_oridinal(),
+                                                 /*memory_map=*/false, reader_props);
+  auto metadata_ptr = file_reader->metadata();
+  EXPECT_EQ(1, metadata_ptr->num_row_groups());
+  EXPECT_EQ(1, metadata_ptr->num_columns());
+  auto row_group = file_reader->RowGroup(0);
+
+  {
+    auto column_reader =
+        std::dynamic_pointer_cast<TypedColumnReader<BooleanType>>(row_group->Column(0));
+    EXPECT_NE(nullptr, column_reader);
+    constexpr int kBatchLength = 1024;
+    std::array<bool, kBatchLength> boolean_values{};
+    int64_t total_values = 0;
+    int64_t values_read = 0;
+    do {
+      values_read = 0;
+      column_reader->ReadBatch(kBatchLength, nullptr, nullptr, boolean_values.data(),
+                               &values_read);
+      total_values += values_read;
+      for (int i = 0; i < values_read; ++i) {
+        EXPECT_FALSE(boolean_values[i]);
+      }
+    } while (values_read != 0);
+    EXPECT_EQ(40000, total_values);
+  }
+  {
+    auto page_reader = row_group->GetColumnPageReader(0);
+    int32_t page_ordinal = 0;
+    while (page_reader->NextPage() != nullptr) {
+      ++page_ordinal;
+    }
+    EXPECT_EQ(40000, page_ordinal);
+  }
+}
+
 }  // namespace parquet
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index 8a3d3fd5ff..5b82793ef7 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit 8a3d3fd5ff7691ee07ca9802df66290a3106e4b7
+Subproject commit 5b82793ef7196f7b3583e85669ced211cd8b5ff2