You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2023/01/12 12:01:27 UTC
[arrow] branch master updated: GH-15074: [Parquet][C++] change 16-bit page_ordinal to 32-bit (#15182)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 48da0dfb6c GH-15074: [Parquet][C++] change 16-bit page_ordinal to 32-bit (#15182)
48da0dfb6c is described below
commit 48da0dfb6c0425646f6043afc41a2515e93a4ffb
Author: mwish <15...@qq.com>
AuthorDate: Thu Jan 12 20:01:19 2023 +0800
GH-15074: [Parquet][C++] change 16-bit page_ordinal to 32-bit (#15182)
As we mentioned in https://github.com/apache/arrow/issues/15074 . `int16_t page_ordinal` may causing overflow. So, we need to change it to 32-bit.
* [x] Implement the logic
* [x] Testing
* [x] Upload a file with more than `int16_t` pages in parquet-testing.
* Closes: #15074
Lead-authored-by: mwish <ma...@gmail.com>
Co-authored-by: Antoine Pitrou <an...@python.org>
Co-authored-by: mwish <15...@qq.com>
Co-authored-by: Antoine Pitrou <pi...@free.fr>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/parquet/column_reader.cc | 15 ++++----
cpp/src/parquet/column_writer.cc | 6 +--
cpp/src/parquet/encryption/encryption_internal.cc | 24 ++++++++----
cpp/src/parquet/encryption/encryption_internal.h | 4 +-
.../encryption/encryption_internal_nossl.cc | 4 +-
cpp/src/parquet/reader_test.cc | 44 ++++++++++++++++++++++
cpp/submodules/parquet-testing | 2 +-
7 files changed, 76 insertions(+), 23 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 9246096c95..f881651737 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -261,7 +261,7 @@ class SerializedPageReader : public PageReader {
private:
void UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor, int8_t module_type,
- const std::string& page_aad);
+ std::string* page_aad);
void InitDecryption();
@@ -291,7 +291,7 @@ class SerializedPageReader : public PageReader {
// The ordinal fields in the context below are used for AAD suffix calculation.
CryptoContext crypto_ctx_;
- int16_t page_ordinal_; // page ordinal does not count the dictionary page
+ int32_t page_ordinal_; // page ordinal does not count the dictionary page
// Maximum allowed page size
uint32_t max_page_header_size_;
@@ -329,8 +329,7 @@ void SerializedPageReader::InitDecryption() {
}
void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& decryptor,
- int8_t module_type,
- const std::string& page_aad) {
+ int8_t module_type, std::string* page_aad) {
DCHECK(decryptor != nullptr);
if (crypto_ctx_.start_decrypt_with_dictionary_page) {
std::string aad = encryption::CreateModuleAad(
@@ -338,8 +337,8 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
crypto_ctx_.column_ordinal, kNonPageOrdinal);
decryptor->UpdateAad(aad);
} else {
- encryption::QuickUpdatePageAad(page_aad, page_ordinal_);
- decryptor->UpdateAad(page_aad);
+ encryption::QuickUpdatePageAad(page_ordinal_, page_aad);
+ decryptor->UpdateAad(*page_aad);
}
}
@@ -366,7 +365,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
try {
if (crypto_ctx_.meta_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
- data_page_header_aad_);
+ &data_page_header_aad_);
}
deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
&header_size, ¤t_page_header_,
@@ -394,7 +393,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
if (crypto_ctx_.data_decryptor != nullptr) {
UpdateDecryption(crypto_ctx_.data_decryptor, encryption::kDictionaryPage,
- data_page_aad_);
+ &data_page_aad_);
}
// Read the compressed data page.
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 7b5ca8810b..1fc13aa3c1 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -482,12 +482,12 @@ class SerializedPageWriter : public PageWriter {
break;
}
case encryption::kDataPage: {
- encryption::QuickUpdatePageAad(data_page_aad_, page_ordinal_);
+ encryption::QuickUpdatePageAad(page_ordinal_, &data_page_aad_);
data_encryptor_->UpdateAad(data_page_aad_);
break;
}
case encryption::kDataPageHeader: {
- encryption::QuickUpdatePageAad(data_page_header_aad_, page_ordinal_);
+ encryption::QuickUpdatePageAad(page_ordinal_, &data_page_header_aad_);
meta_encryptor_->UpdateAad(data_page_header_aad_);
break;
}
@@ -516,7 +516,7 @@ class SerializedPageWriter : public PageWriter {
int64_t data_page_offset_;
int64_t total_uncompressed_size_;
int64_t total_compressed_size_;
- int16_t page_ordinal_;
+ int32_t page_ordinal_;
int16_t row_group_ordinal_;
int16_t column_ordinal_;
diff --git a/cpp/src/parquet/encryption/encryption_internal.cc b/cpp/src/parquet/encryption/encryption_internal.cc
index fee3434d09..1c4d3d8dc4 100644
--- a/cpp/src/parquet/encryption/encryption_internal.cc
+++ b/cpp/src/parquet/encryption/encryption_internal.cc
@@ -597,9 +597,19 @@ static std::string ShortToBytesLe(int16_t input) {
return std::string(reinterpret_cast<char const*>(output), 2);
}
+static void CheckPageOrdinal(int32_t page_ordinal) {
+ if (ARROW_PREDICT_FALSE(page_ordinal > std::numeric_limits<int16_t>::max())) {
+ throw ParquetException("Encrypted Parquet files can't have more than " +
+ std::to_string(std::numeric_limits<int16_t>::max()) +
+ " pages per chunk: got " + std::to_string(page_ordinal));
+ }
+}
+
std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
int16_t row_group_ordinal, int16_t column_ordinal,
- int16_t page_ordinal) {
+ int32_t page_ordinal) {
+ CheckPageOrdinal(page_ordinal);
+ const int16_t page_ordinal_short = static_cast<int16_t>(page_ordinal);
int8_t type_ordinal_bytes[1];
type_ordinal_bytes[0] = module_type;
std::string type_ordinal_bytes_str(reinterpret_cast<char const*>(type_ordinal_bytes),
@@ -616,7 +626,7 @@ std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
<< column_ordinal_bytes;
return out.str();
}
- std::string page_ordinal_bytes = ShortToBytesLe(page_ordinal);
+ std::string page_ordinal_bytes = ShortToBytesLe(page_ordinal_short);
std::ostringstream out;
out << file_aad << type_ordinal_bytes_str << row_group_ordinal_bytes
<< column_ordinal_bytes << page_ordinal_bytes;
@@ -630,11 +640,11 @@ std::string CreateFooterAad(const std::string& aad_prefix_bytes) {
// Update last two bytes with new page ordinal (instead of creating new page AAD
// from scratch)
-void QuickUpdatePageAad(const std::string& AAD, int16_t new_page_ordinal) {
- std::string page_ordinal_bytes = ShortToBytesLe(new_page_ordinal);
- int length = static_cast<int>(AAD.size());
- std::memcpy(reinterpret_cast<int16_t*>(const_cast<char*>(AAD.c_str() + length - 2)),
- reinterpret_cast<const int16_t*>(page_ordinal_bytes.c_str()), 2);
+void QuickUpdatePageAad(int32_t new_page_ordinal, std::string* AAD) {
+ CheckPageOrdinal(new_page_ordinal);
+ const std::string page_ordinal_bytes =
+ ShortToBytesLe(static_cast<int16_t>(new_page_ordinal));
+ std::memcpy(AAD->data() + AAD->length() - 2, page_ordinal_bytes.data(), 2);
}
void RandBytes(unsigned char* buf, int num) { RAND_bytes(buf, num); }
diff --git a/cpp/src/parquet/encryption/encryption_internal.h b/cpp/src/parquet/encryption/encryption_internal.h
index ede338182f..24093c68be 100644
--- a/cpp/src/parquet/encryption/encryption_internal.h
+++ b/cpp/src/parquet/encryption/encryption_internal.h
@@ -119,12 +119,12 @@ class AesDecryptor {
std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
int16_t row_group_ordinal, int16_t column_ordinal,
- int16_t page_ordinal);
+ int32_t page_ordinal);
std::string CreateFooterAad(const std::string& aad_prefix_bytes);
// Update last two bytes of page (or page header) module AAD
-void QuickUpdatePageAad(const std::string& AAD, int16_t new_page_ordinal);
+void QuickUpdatePageAad(int32_t new_page_ordinal, std::string* AAD);
// Wraps OpenSSL RAND_bytes function
void RandBytes(unsigned char* buf, int num);
diff --git a/cpp/src/parquet/encryption/encryption_internal_nossl.cc b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
index 540ee16f78..bb203f0fd8 100644
--- a/cpp/src/parquet/encryption/encryption_internal_nossl.cc
+++ b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
@@ -99,7 +99,7 @@ int AesDecryptor::CiphertextSizeDelta() {
std::string CreateModuleAad(const std::string& file_aad, int8_t module_type,
int16_t row_group_ordinal, int16_t column_ordinal,
- int16_t page_ordinal) {
+ int32_t page_ordinal) {
ThrowOpenSSLRequiredException();
return "";
}
@@ -109,7 +109,7 @@ std::string CreateFooterAad(const std::string& aad_prefix_bytes) {
return "";
}
-void QuickUpdatePageAad(const std::string& AAD, int16_t new_page_ordinal) {
+void QuickUpdatePageAad(int32_t new_page_ordinal, std::string* AAD) {
ThrowOpenSSLRequiredException();
}
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 03e6922d13..e17f7a91f9 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -86,6 +86,10 @@ std::string lz4_raw_compressed_larger() {
return data_file("lz4_raw_compressed_larger.parquet");
}
+std::string overflow_i16_page_oridinal() {
+ return data_file("overflow_i16_page_cnt.parquet");
+}
+
// TODO: Assert on definition and repetition levels
template <typename DType, typename ValueType>
void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size,
@@ -1015,4 +1019,44 @@ INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec, ::testing::ValuesIn(test_code
testing::PrintToStringParamName());
#endif // ARROW_WITH_LZ4
+// Test reading a data file with a ColumnChunk contains more than
+// INT16_MAX pages. (GH-15074).
+TEST(TestFileReader, TestOverflowInt16PageOrdinal) {
+ ReaderProperties reader_props;
+ auto file_reader = ParquetFileReader::OpenFile(overflow_i16_page_oridinal(),
+ /*memory_map=*/false, reader_props);
+ auto metadata_ptr = file_reader->metadata();
+ EXPECT_EQ(1, metadata_ptr->num_row_groups());
+ EXPECT_EQ(1, metadata_ptr->num_columns());
+ auto row_group = file_reader->RowGroup(0);
+
+ {
+ auto column_reader =
+ std::dynamic_pointer_cast<TypedColumnReader<BooleanType>>(row_group->Column(0));
+ EXPECT_NE(nullptr, column_reader);
+ constexpr int kBatchLength = 1024;
+ std::array<bool, kBatchLength> boolean_values{};
+ int64_t total_values = 0;
+ int64_t values_read = 0;
+ do {
+ values_read = 0;
+ column_reader->ReadBatch(kBatchLength, nullptr, nullptr, boolean_values.data(),
+ &values_read);
+ total_values += values_read;
+ for (int i = 0; i < values_read; ++i) {
+ EXPECT_FALSE(boolean_values[i]);
+ }
+ } while (values_read != 0);
+ EXPECT_EQ(40000, total_values);
+ }
+ {
+ auto page_reader = row_group->GetColumnPageReader(0);
+ int32_t page_ordinal = 0;
+ while (page_reader->NextPage() != nullptr) {
+ ++page_ordinal;
+ }
+ EXPECT_EQ(40000, page_ordinal);
+ }
+}
+
} // namespace parquet
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index 8a3d3fd5ff..5b82793ef7 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit 8a3d3fd5ff7691ee07ca9802df66290a3106e4b7
+Subproject commit 5b82793ef7196f7b3583e85669ced211cd8b5ff2