You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/06/07 11:29:16 UTC
[arrow] branch master updated: ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275)
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new dd33c750e0 ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275)
dd33c750e0 is described below
commit dd33c750e068baddba99589076aa12b892268640
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Jun 7 13:29:05 2022 +0200
ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275)
In fringe cases, users may have Parquet files where deserializing exceeds our default Thrift size limits.
Authored-by: Antoine Pitrou <an...@python.org>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/cmake_modules/SetupCxxFlags.cmake | 1 -
cpp/src/arrow/dataset/file_parquet.cc | 4 +
cpp/src/parquet/column_reader.cc | 31 +++++--
cpp/src/parquet/column_reader.h | 5 ++
cpp/src/parquet/column_writer_test.cc | 8 +-
cpp/src/parquet/file_reader.cc | 4 +-
cpp/src/parquet/metadata.cc | 123 ++++++++++++++++---------
cpp/src/parquet/metadata.h | 35 +++++++-
cpp/src/parquet/properties.h | 16 ++++
cpp/src/parquet/statistics_test.cc | 4 +-
cpp/src/parquet/thrift_internal.h | 138 ++++++++++++++++-------------
python/pyarrow/_dataset_parquet.pyx | 64 +++++++++++--
python/pyarrow/_parquet.pxd | 8 ++
python/pyarrow/_parquet.pyx | 18 +++-
python/pyarrow/parquet/__init__.py | 61 ++++++++++---
python/pyarrow/tests/parquet/test_basic.py | 27 ++++++
python/pyarrow/tests/test_dataset.py | 11 +++
17 files changed, 407 insertions(+), 151 deletions(-)
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 713bc5528b..66b84dbc29 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -290,7 +290,6 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-conversion")
- set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wunused-result")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel")
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index f2a3903208..0d95e18171 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -69,6 +69,10 @@ parquet::ReaderProperties MakeReaderProperties(
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
properties.file_decryption_properties(
parquet_scan_options->reader_properties->file_decryption_properties());
+ properties.set_thrift_string_size_limit(
+ parquet_scan_options->reader_properties->thrift_string_size_limit());
+ properties.set_thrift_container_size_limit(
+ parquet_scan_options->reader_properties->thrift_container_size_limit());
return properties;
}
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 76476c5da7..4f82992aeb 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -223,14 +223,15 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
class SerializedPageReader : public PageReader {
public:
SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
- Compression::type codec, ::arrow::MemoryPool* pool,
+ Compression::type codec, const ReaderProperties& properties,
const CryptoContext* crypto_ctx)
- : stream_(std::move(stream)),
- decompression_buffer_(AllocateBuffer(pool, 0)),
+ : properties_(properties),
+ stream_(std::move(stream)),
+ decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
page_ordinal_(0),
seen_num_rows_(0),
total_num_rows_(total_num_rows),
- decryption_buffer_(AllocateBuffer(pool, 0)) {
+ decryption_buffer_(AllocateBuffer(properties_.memory_pool(), 0)) {
if (crypto_ctx != nullptr) {
crypto_ctx_ = *crypto_ctx;
InitDecryption();
@@ -254,6 +255,7 @@ class SerializedPageReader : public PageReader {
int compressed_len, int uncompressed_len,
int levels_byte_len = 0);
+ const ReaderProperties properties_;
std::shared_ptr<ArrowInputStream> stream_;
format::PageHeader current_page_header_;
@@ -326,9 +328,10 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
}
std::shared_ptr<Page> SerializedPageReader::NextPage() {
+ ThriftDeserializer deserializer(properties_);
+
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
-
while (seen_num_rows_ < total_num_rows_) {
uint32_t header_size = 0;
uint32_t allowed_page_size = kDefaultPageHeaderSize;
@@ -349,8 +352,9 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
data_page_header_aad_);
}
- DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size,
- ¤t_page_header_, crypto_ctx_.meta_decryptor);
+ deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
+ &header_size, ¤t_page_header_,
+ crypto_ctx_.meta_decryptor);
break;
} catch (std::exception& e) {
// Failed to deserialize. Double the allowed page header size and try again
@@ -508,13 +512,22 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
} // namespace
+std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
+ int64_t total_num_rows,
+ Compression::type codec,
+ const ReaderProperties& properties,
+ const CryptoContext* ctx) {
+ return std::unique_ptr<PageReader>(new SerializedPageReader(
+ std::move(stream), total_num_rows, codec, properties, ctx));
+}
+
std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
int64_t total_num_rows,
Compression::type codec,
::arrow::MemoryPool* pool,
const CryptoContext* ctx) {
- return std::unique_ptr<PageReader>(
- new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx));
+ return std::unique_ptr<PageReader>(new SerializedPageReader(
+ std::move(stream), total_num_rows, codec, ReaderProperties(pool), ctx));
}
namespace {
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 50eac31dc3..c22f9f2fc7 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -25,6 +25,7 @@
#include "parquet/exception.h"
#include "parquet/level_conversion.h"
#include "parquet/platform.h"
+#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/types.h"
@@ -106,6 +107,10 @@ class PARQUET_EXPORT PageReader {
std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
Compression::type codec, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
const CryptoContext* ctx = NULLPTR);
+ static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream,
+ int64_t total_num_rows, Compression::type codec,
+ const ReaderProperties& properties,
+ const CryptoContext* ctx = NULLPTR);
// @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
// containing new Page otherwise
diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc
index 61f4c47e78..2cd21628b3 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -276,8 +276,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
ApplicationVersion app_version(this->writer_properties_->created_by());
- auto metadata_accessor =
- ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
+ auto metadata_accessor = ColumnChunkMetaData::Make(
+ metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
return metadata_accessor->is_stats_set();
}
@@ -286,8 +286,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
// This is because the ColumnChunkMetaData semantics dictate the metadata object is
// complete (no changes to the metadata buffer can be made after instantiation)
ApplicationVersion app_version(this->writer_properties_->created_by());
- auto metadata_accessor =
- ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
+ auto metadata_accessor = ColumnChunkMetaData::Make(
+ metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
auto encoded_stats = metadata_accessor->statistics()->Encode();
return {encoded_stats.has_min, encoded_stats.has_max};
}
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 3f4c2cb76a..51f90b1730 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -543,8 +543,8 @@ uint32_t SerializedFile::ParseUnencryptedFileMetadata(
}
uint32_t read_metadata_len = metadata_len;
// The encrypted read path falls through to here, so pass in the decryptor
- file_metadata_ =
- FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, file_decryptor_);
+ file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &read_metadata_len,
+ properties_, file_decryptor_);
return read_metadata_len;
}
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 102fa874ad..6226c3ad09 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -176,9 +176,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column,
const ColumnDescriptor* descr,
int16_t row_group_ordinal, int16_t column_ordinal,
+ const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
- : column_(column), descr_(descr), writer_version_(writer_version) {
+ : column_(column),
+ descr_(descr),
+ properties_(properties),
+ writer_version_(writer_version) {
column_metadata_ = &column->meta_data;
if (column->__isset.crypto_metadata) { // column metadata is encrypted
format::ColumnCryptoMetaData ccmd = column->crypto_metadata;
@@ -196,7 +200,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
auto decryptor = file_decryptor->GetColumnMetaDecryptor(
path->ToDotString(), key_metadata, aad_column_metadata);
auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size());
- DeserializeThriftMsg(
+ ThriftDeserializer deserializer(properties_);
+ deserializer.DeserializeMessage(
reinterpret_cast<const uint8_t*>(column->encrypted_column_metadata.c_str()),
&len, &decrypted_metadata_, decryptor);
column_metadata_ = &decrypted_metadata_;
@@ -306,26 +311,38 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const format::ColumnMetaData* column_metadata_;
format::ColumnMetaData decrypted_metadata_;
const ColumnDescriptor* descr_;
+ const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
};
std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
const void* metadata, const ColumnDescriptor* descr,
- const ApplicationVersion* writer_version, int16_t row_group_ordinal,
- int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+ const ReaderProperties& properties, const ApplicationVersion* writer_version,
+ int16_t row_group_ordinal, int16_t column_ordinal,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor) {
return std::unique_ptr<ColumnChunkMetaData>(
new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal,
- writer_version, std::move(file_decryptor)));
+ properties, writer_version, std::move(file_decryptor)));
+}
+
+std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
+ const void* metadata, const ColumnDescriptor* descr,
+ const ApplicationVersion* writer_version, int16_t row_group_ordinal,
+ int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+ return std::unique_ptr<ColumnChunkMetaData>(new ColumnChunkMetaData(
+ metadata, descr, row_group_ordinal, column_ordinal, default_reader_properties(),
+ writer_version, std::move(file_decryptor)));
}
ColumnChunkMetaData::ColumnChunkMetaData(
const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal,
- int16_t column_ordinal, const ApplicationVersion* writer_version,
+ int16_t column_ordinal, const ReaderProperties& properties,
+ const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: impl_{new ColumnChunkMetaDataImpl(
reinterpret_cast<const format::ColumnChunk*>(metadata), descr,
- row_group_ordinal, column_ordinal, writer_version, std::move(file_decryptor))} {
-}
+ row_group_ordinal, column_ordinal, properties, writer_version,
+ std::move(file_decryptor))} {}
ColumnChunkMetaData::~ColumnChunkMetaData() = default;
@@ -403,10 +420,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
public:
explicit RowGroupMetaDataImpl(const format::RowGroup* row_group,
const SchemaDescriptor* schema,
+ const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: row_group_(row_group),
schema_(schema),
+ properties_(properties),
writer_version_(writer_version),
file_decryptor_(std::move(file_decryptor)) {}
@@ -431,7 +450,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
if (i < num_columns()) {
return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
- writer_version_, row_group_->ordinal,
+ properties_, writer_version_, row_group_->ordinal,
static_cast<int16_t>(i), file_decryptor_);
}
throw ParquetException("The file only has ", num_columns(),
@@ -441,6 +460,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
private:
const format::RowGroup* row_group_;
const SchemaDescriptor* schema_;
+ const ReaderProperties properties_;
const ApplicationVersion* writer_version_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
};
@@ -449,16 +469,26 @@ std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
const void* metadata, const SchemaDescriptor* schema,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
- return std::unique_ptr<RowGroupMetaData>(
- new RowGroupMetaData(metadata, schema, writer_version, std::move(file_decryptor)));
+ return std::unique_ptr<parquet::RowGroupMetaData>(
+ new RowGroupMetaData(metadata, schema, default_reader_properties(), writer_version,
+ std::move(file_decryptor)));
+}
+
+std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
+ const void* metadata, const SchemaDescriptor* schema,
+ const ReaderProperties& properties, const ApplicationVersion* writer_version,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+ return std::unique_ptr<parquet::RowGroupMetaData>(new RowGroupMetaData(
+ metadata, schema, properties, writer_version, std::move(file_decryptor)));
}
RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
+ const ReaderProperties& properties,
const ApplicationVersion* writer_version,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: impl_{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup*>(metadata),
- schema, writer_version, std::move(file_decryptor))} {
-}
+ schema, properties, writer_version,
+ std::move(file_decryptor))} {}
RowGroupMetaData::~RowGroupMetaData() = default;
@@ -500,16 +530,17 @@ class FileMetaData::FileMetaDataImpl {
FileMetaDataImpl() = default;
explicit FileMetaDataImpl(
- const void* metadata, uint32_t* metadata_len,
+ const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr)
- : file_decryptor_(file_decryptor) {
+ : properties_(properties), file_decryptor_(file_decryptor) {
metadata_.reset(new format::FileMetaData);
auto footer_decryptor =
file_decryptor_ != nullptr ? file_decryptor->GetFooterDecryptor() : nullptr;
- DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len,
- metadata_.get(), footer_decryptor);
+ ThriftDeserializer deserializer(properties_);
+ deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(metadata),
+ metadata_len, metadata_.get(), footer_decryptor);
metadata_len_ = *metadata_len;
if (metadata_->__isset.created_by) {
@@ -622,8 +653,8 @@ class FileMetaData::FileMetaDataImpl {
<< " row groups, requested metadata for row group: " << i;
throw ParquetException(ss.str());
}
- return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_,
- file_decryptor_);
+ return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_,
+ &writer_version_, file_decryptor_);
}
bool Equals(const FileMetaDataImpl& other) const {
@@ -718,6 +749,7 @@ class FileMetaData::FileMetaDataImpl {
SchemaDescriptor schema_;
ApplicationVersion writer_version_;
std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
+ const ReaderProperties properties_;
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
void InitSchema() {
@@ -759,20 +791,26 @@ class FileMetaData::FileMetaDataImpl {
};
std::shared_ptr<FileMetaData> FileMetaData::Make(
- const void* metadata, uint32_t* metadata_len,
+ const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor) {
// This FileMetaData ctor is private, not compatible with std::make_shared
return std::shared_ptr<FileMetaData>(
- new FileMetaData(metadata, metadata_len, file_decryptor));
+ new FileMetaData(metadata, metadata_len, properties, file_decryptor));
+}
+
+std::shared_ptr<FileMetaData> FileMetaData::Make(
+ const void* metadata, uint32_t* metadata_len,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+ return std::shared_ptr<FileMetaData>(new FileMetaData(
+ metadata, metadata_len, default_reader_properties(), file_decryptor));
}
FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len,
+ const ReaderProperties& properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
- : impl_{std::unique_ptr<FileMetaDataImpl>(
- new FileMetaDataImpl(metadata, metadata_len, file_decryptor))} {}
+ : impl_(new FileMetaDataImpl(metadata, metadata_len, properties, file_decryptor)) {}
-FileMetaData::FileMetaData()
- : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {}
+FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {}
FileMetaData::~FileMetaData() = default;
@@ -870,24 +908,27 @@ class FileCryptoMetaData::FileCryptoMetaDataImpl {
public:
FileCryptoMetaDataImpl() = default;
- explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) {
- metadata_.reset(new format::FileCryptoMetaData);
- DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+ explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len,
+ const ReaderProperties& properties) {
+ ThriftDeserializer deserializer(properties);
+ deserializer.DeserializeMessage(metadata, metadata_len, &metadata_);
metadata_len_ = *metadata_len;
}
- EncryptionAlgorithm encryption_algorithm() {
- return FromThrift(metadata_->encryption_algorithm);
+ EncryptionAlgorithm encryption_algorithm() const {
+ return FromThrift(metadata_.encryption_algorithm);
}
- const std::string& key_metadata() { return metadata_->key_metadata; }
+
+ const std::string& key_metadata() const { return metadata_.key_metadata; }
+
void WriteTo(::arrow::io::OutputStream* dst) const {
ThriftSerializer serializer;
- serializer.Serialize(metadata_.get(), dst);
+ serializer.Serialize(&metadata_, dst);
}
private:
friend FileMetaDataBuilder;
- std::unique_ptr<format::FileCryptoMetaData> metadata_;
+ format::FileCryptoMetaData metadata_;
uint32_t metadata_len_;
};
@@ -900,14 +941,16 @@ const std::string& FileCryptoMetaData::key_metadata() const {
}
std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make(
- const uint8_t* serialized_metadata, uint32_t* metadata_len) {
+ const uint8_t* serialized_metadata, uint32_t* metadata_len,
+ const ReaderProperties& properties) {
return std::shared_ptr<FileCryptoMetaData>(
- new FileCryptoMetaData(serialized_metadata, metadata_len));
+ new FileCryptoMetaData(serialized_metadata, metadata_len, properties));
}
FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata,
- uint32_t* metadata_len)
- : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len)) {}
+ uint32_t* metadata_len,
+ const ReaderProperties& properties)
+ : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {}
FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {}
@@ -1754,10 +1797,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
crypto_metadata_->__set_key_metadata(key_metadata);
}
- std::unique_ptr<FileCryptoMetaData> file_crypto_metadata =
- std::unique_ptr<FileCryptoMetaData>(new FileCryptoMetaData());
- file_crypto_metadata->impl_->metadata_ = std::move(crypto_metadata_);
-
+ std::unique_ptr<FileCryptoMetaData> file_crypto_metadata(new FileCryptoMetaData());
+ file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_);
return file_crypto_metadata;
}
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 3dd936d90e..89dca5667b 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -121,8 +121,17 @@ struct PageEncodingStats {
class PARQUET_EXPORT ColumnChunkMetaData {
public:
// API convenience to get a MetaData accessor
+
+ ARROW_DEPRECATED("Use the ReaderProperties-taking overload")
static std::unique_ptr<ColumnChunkMetaData> Make(
const void* metadata, const ColumnDescriptor* descr,
+ const ApplicationVersion* writer_version, int16_t row_group_ordinal = -1,
+ int16_t column_ordinal = -1,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
+
+ static std::unique_ptr<ColumnChunkMetaData> Make(
+ const void* metadata, const ColumnDescriptor* descr,
+ const ReaderProperties& properties = default_reader_properties(),
const ApplicationVersion* writer_version = NULLPTR, int16_t row_group_ordinal = -1,
int16_t column_ordinal = -1,
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
@@ -164,7 +173,8 @@ class PARQUET_EXPORT ColumnChunkMetaData {
private:
explicit ColumnChunkMetaData(
const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal,
- int16_t column_ordinal, const ApplicationVersion* writer_version = NULLPTR,
+ int16_t column_ordinal, const ReaderProperties& properties,
+ const ApplicationVersion* writer_version = NULLPTR,
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
// PIMPL Idiom
class ColumnChunkMetaDataImpl;
@@ -174,9 +184,16 @@ class PARQUET_EXPORT ColumnChunkMetaData {
/// \brief RowGroupMetaData is a proxy around format::RowGroupMetaData.
class PARQUET_EXPORT RowGroupMetaData {
public:
+ ARROW_DEPRECATED("Use the ReaderProperties-taking overload")
+ static std::unique_ptr<RowGroupMetaData> Make(
+ const void* metadata, const SchemaDescriptor* schema,
+ const ApplicationVersion* writer_version,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
+
/// \brief Create a RowGroupMetaData from a serialized thrift message.
static std::unique_ptr<RowGroupMetaData> Make(
const void* metadata, const SchemaDescriptor* schema,
+ const ReaderProperties& properties = default_reader_properties(),
const ApplicationVersion* writer_version = NULLPTR,
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
@@ -225,6 +242,7 @@ class PARQUET_EXPORT RowGroupMetaData {
private:
explicit RowGroupMetaData(
const void* metadata, const SchemaDescriptor* schema,
+ const ReaderProperties& properties,
const ApplicationVersion* writer_version = NULLPTR,
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
// PIMPL Idiom
@@ -237,9 +255,15 @@ class FileMetaDataBuilder;
/// \brief FileMetaData is a proxy around format::FileMetaData.
class PARQUET_EXPORT FileMetaData {
public:
+ ARROW_DEPRECATED("Use the ReaderProperties-taking overload")
+ static std::shared_ptr<FileMetaData> Make(
+ const void* serialized_metadata, uint32_t* inout_metadata_len,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor);
+
/// \brief Create a FileMetaData from a serialized thrift message.
static std::shared_ptr<FileMetaData> Make(
const void* serialized_metadata, uint32_t* inout_metadata_len,
+ const ReaderProperties& properties = default_reader_properties(),
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
~FileMetaData();
@@ -350,6 +374,7 @@ class PARQUET_EXPORT FileMetaData {
friend class SerializedFile;
explicit FileMetaData(const void* serialized_metadata, uint32_t* metadata_len,
+ const ReaderProperties& properties,
std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor);
@@ -363,8 +388,9 @@ class PARQUET_EXPORT FileMetaData {
class PARQUET_EXPORT FileCryptoMetaData {
public:
// API convenience to get a MetaData accessor
- static std::shared_ptr<FileCryptoMetaData> Make(const uint8_t* serialized_metadata,
- uint32_t* metadata_len);
+ static std::shared_ptr<FileCryptoMetaData> Make(
+ const uint8_t* serialized_metadata, uint32_t* metadata_len,
+ const ReaderProperties& properties = default_reader_properties());
~FileCryptoMetaData();
EncryptionAlgorithm encryption_algorithm() const;
@@ -374,7 +400,8 @@ class PARQUET_EXPORT FileCryptoMetaData {
private:
friend FileMetaDataBuilder;
- FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+ FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len,
+ const ReaderProperties& properties);
// PIMPL Idiom
FileCryptoMetaData();
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 5c81c75357..1d5c360cc1 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -49,6 +49,12 @@ enum class ParquetDataPageVersion { V1, V2 };
/// Align the default buffer size to a small multiple of a page size.
constexpr int64_t kDefaultBufferSize = 4096 * 4;
+constexpr int32_t kDefaultThriftStringSizeLimit = 100 * 1000 * 1000;
+// Structs in the thrift definition are relatively large (at least 300 bytes).
+// This limits total memory to the same order of magnitude as
+// kDefaultStringSizeLimit.
+constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 1000;
+
class PARQUET_EXPORT ReaderProperties {
public:
explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool())
@@ -73,6 +79,14 @@ class PARQUET_EXPORT ReaderProperties {
int64_t buffer_size() const { return buffer_size_; }
void set_buffer_size(int64_t size) { buffer_size_ = size; }
+ int32_t thrift_string_size_limit() const { return thrift_string_size_limit_; }
+ void set_thrift_string_size_limit(int32_t size) { thrift_string_size_limit_ = size; }
+
+ int32_t thrift_container_size_limit() const { return thrift_container_size_limit_; }
+ void set_thrift_container_size_limit(int32_t size) {
+ thrift_container_size_limit_ = size;
+ }
+
void file_decryption_properties(std::shared_ptr<FileDecryptionProperties> decryption) {
file_decryption_properties_ = std::move(decryption);
}
@@ -84,6 +98,8 @@ class PARQUET_EXPORT ReaderProperties {
private:
MemoryPool* pool_;
int64_t buffer_size_ = kDefaultBufferSize;
+ int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit;
+ int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
bool buffered_stream_enabled_ = false;
std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
};
diff --git a/cpp/src/parquet/statistics_test.cc b/cpp/src/parquet/statistics_test.cc
index e678598919..03da895380 100644
--- a/cpp/src/parquet/statistics_test.cc
+++ b/cpp/src/parquet/statistics_test.cc
@@ -595,8 +595,8 @@ void AssertStatsSet(const ApplicationVersion& version,
std::shared_ptr<parquet::WriterProperties> props,
const ColumnDescriptor* column, bool expected_is_set) {
auto metadata_builder = ColumnChunkMetaDataBuilder::Make(props, column);
- auto column_chunk =
- ColumnChunkMetaData::Make(metadata_builder->contents(), column, &version);
+ auto column_chunk = ColumnChunkMetaData::Make(metadata_builder->contents(), column,
+ default_reader_properties(), &version);
EncodedStatistics stats;
stats.set_is_signed(false);
metadata_builder->SetStatistics(stats);
diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h
index 17248f3bdc..3c74dfc07b 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -42,6 +42,7 @@
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
#include "parquet/platform.h"
+#include "parquet/properties.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
@@ -350,74 +351,84 @@ static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryptio
using ThriftBuffer = apache::thrift::transport::TMemoryBuffer;
-// On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size
-// limit (ARROW-13655). If we wanted to protect against huge messages, we could
-// do it ourselves since we know the message size up front.
+class ThriftDeserializer {
+ public:
+ explicit ThriftDeserializer(const ReaderProperties& properties)
+ : ThriftDeserializer(properties.thrift_string_size_limit(),
+ properties.thrift_container_size_limit()) {}
+
+ ThriftDeserializer(int32_t string_size_limit, int32_t container_size_limit)
+ : string_size_limit_(string_size_limit),
+ container_size_limit_(container_size_limit) {}
-inline std::shared_ptr<ThriftBuffer> CreateReadOnlyMemoryBuffer(uint8_t* buf,
- uint32_t len) {
+ // Deserialize a thrift message from buf/len. buf/len must at least contain
+ // all the bytes needed to store the thrift message. On return, len will be
+ // set to the actual length of the header.
+ template <class T>
+ void DeserializeMessage(const uint8_t* buf, uint32_t* len, T* deserialized_msg,
+ const std::shared_ptr<Decryptor>& decryptor = NULLPTR) {
+ if (decryptor == NULLPTR) {
+ // thrift message is not encrypted
+ DeserializeUnencryptedMessage(buf, len, deserialized_msg);
+ } else {
+ // thrift message is encrypted
+ uint32_t clen;
+ clen = *len;
+ // decrypt
+ auto decrypted_buffer = std::static_pointer_cast<ResizableBuffer>(
+ AllocateBuffer(decryptor->pool(),
+ static_cast<int64_t>(clen - decryptor->CiphertextSizeDelta())));
+ const uint8_t* cipher_buf = buf;
+ uint32_t decrypted_buffer_len =
+ decryptor->Decrypt(cipher_buf, 0, decrypted_buffer->mutable_data());
+ if (decrypted_buffer_len <= 0) {
+ throw ParquetException("Couldn't decrypt buffer\n");
+ }
+ *len = decrypted_buffer_len + decryptor->CiphertextSizeDelta();
+ DeserializeUnencryptedMessage(decrypted_buffer->data(), &decrypted_buffer_len,
+ deserialized_msg);
+ }
+ }
+
+ private:
+ // On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size
+ // limit (ARROW-13655). If we wanted to protect against huge messages, we could
+ // do it ourselves since we know the message size up front.
+ std::shared_ptr<ThriftBuffer> CreateReadOnlyMemoryBuffer(uint8_t* buf, uint32_t len) {
#if PARQUET_THRIFT_VERSION_MAJOR > 0 || PARQUET_THRIFT_VERSION_MINOR >= 14
- auto conf = std::make_shared<apache::thrift::TConfiguration>();
- conf->setMaxMessageSize(std::numeric_limits<int>::max());
- return std::shared_ptr<ThriftBuffer>(
- new ThriftBuffer(buf, len, ThriftBuffer::OBSERVE, conf));
+ auto conf = std::make_shared<apache::thrift::TConfiguration>();
+ conf->setMaxMessageSize(std::numeric_limits<int>::max());
+ return std::shared_ptr<ThriftBuffer>(
+ new ThriftBuffer(buf, len, ThriftBuffer::OBSERVE, conf));
#else
- return std::shared_ptr<ThriftBuffer>(new ThriftBuffer(buf, len));
+ return std::shared_ptr<ThriftBuffer>(new ThriftBuffer(buf, len));
#endif
-}
-
-template <class T>
-inline void DeserializeThriftUnencryptedMsg(const uint8_t* buf, uint32_t* len,
- T* deserialized_msg) {
- // Deserialize msg bytes into c++ thrift msg using memory transport.
- auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast<uint8_t*>(buf), *len);
- apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory;
- // Protect against CPU and memory bombs
- tproto_factory.setStringSizeLimit(100 * 1000 * 1000);
- // Structs in the thrift definition are relatively large (at least 300 bytes).
- // This limits total memory to the same order of magnitude as stringSize.
- tproto_factory.setContainerSizeLimit(1000 * 1000);
- std::shared_ptr<apache::thrift::protocol::TProtocol> tproto = //
- tproto_factory.getProtocol(tmem_transport);
- try {
- deserialized_msg->read(tproto.get());
- } catch (std::exception& e) {
- std::stringstream ss;
- ss << "Couldn't deserialize thrift: " << e.what() << "\n";
- throw ParquetException(ss.str());
}
- uint32_t bytes_left = tmem_transport->available_read();
- *len = *len - bytes_left;
-}
-// Deserialize a thrift message from buf/len. buf/len must at least contain
-// all the bytes needed to store the thrift message. On return, len will be
-// set to the actual length of the header.
-template <class T>
-inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg,
- const std::shared_ptr<Decryptor>& decryptor = NULLPTR) {
- // thrift message is not encrypted
- if (decryptor == NULLPTR) {
- DeserializeThriftUnencryptedMsg(buf, len, deserialized_msg);
- } else { // thrift message is encrypted
- uint32_t clen;
- clen = *len;
- // decrypt
- std::shared_ptr<ResizableBuffer> decrypted_buffer =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
- decryptor->pool(),
- static_cast<int64_t>(clen - decryptor->CiphertextSizeDelta())));
- const uint8_t* cipher_buf = buf;
- uint32_t decrypted_buffer_len =
- decryptor->Decrypt(cipher_buf, 0, decrypted_buffer->mutable_data());
- if (decrypted_buffer_len <= 0) {
- throw ParquetException("Couldn't decrypt buffer\n");
+ template <class T>
+ void DeserializeUnencryptedMessage(const uint8_t* buf, uint32_t* len,
+ T* deserialized_msg) {
+ // Deserialize msg bytes into c++ thrift msg using memory transport.
+ auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast<uint8_t*>(buf), *len);
+ apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory;
+ // Protect against CPU and memory bombs
+ tproto_factory.setStringSizeLimit(string_size_limit_);
+ tproto_factory.setContainerSizeLimit(container_size_limit_);
+ auto tproto = tproto_factory.getProtocol(tmem_transport);
+ try {
+ deserialized_msg->read(tproto.get());
+ } catch (std::exception& e) {
+ std::stringstream ss;
+ ss << "Couldn't deserialize thrift: " << e.what() << "\n";
+ throw ParquetException(ss.str());
}
- *len = decrypted_buffer_len + decryptor->CiphertextSizeDelta();
- DeserializeThriftMsg(decrypted_buffer->data(), &decrypted_buffer_len,
- deserialized_msg);
+ uint32_t bytes_left = tmem_transport->available_read();
+ *len = *len - bytes_left;
}
-}
+
+ const int32_t string_size_limit_;
+ const int32_t container_size_limit_;
+};
/// Utility class to serialize thrift objects to a binary format. This object
/// should be reused if possible to reuse the underlying memory.
@@ -478,10 +489,9 @@ class ThriftSerializer {
int64_t SerializeEncryptedObj(ArrowOutputStream* out, uint8_t* out_buffer,
uint32_t out_length,
const std::shared_ptr<Encryptor>& encryptor) {
- std::shared_ptr<ResizableBuffer> cipher_buffer =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
- encryptor->pool(),
- static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length)));
+ auto cipher_buffer = std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
+ encryptor->pool(),
+ static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length)));
int cipher_buffer_len =
encryptor->Encrypt(out_buffer, out_length, cipher_buffer->mutable_data());
diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx
index 7b91d4c2c7..684253fff3 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -598,6 +598,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
If enabled, pre-buffer the raw Parquet data instead of issuing one
read per column chunk. This can improve performance on high-latency
filesystems.
+ thrift_string_size_limit : int, default None
+ If not None, override the maximum total string size allocated
+ when decoding Thrift structures. The default limit should be
+ sufficient for most Parquet files.
+ thrift_container_size_limit : int, default None
+ If not None, override the maximum total size of containers allocated
+ when decoding Thrift structures. The default limit should be
+ sufficient for most Parquet files.
"""
cdef:
@@ -606,14 +614,20 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
# Avoid mistakingly creating attributes
__slots__ = ()
- def __init__(self, bint use_buffered_stream=False,
+ def __init__(self, *, bint use_buffered_stream=False,
buffer_size=8192,
- bint pre_buffer=False):
+ bint pre_buffer=False,
+ thrift_string_size_limit=None,
+ thrift_container_size_limit=None):
self.init(shared_ptr[CFragmentScanOptions](
new CParquetFragmentScanOptions()))
self.use_buffered_stream = use_buffered_stream
self.buffer_size = buffer_size
self.pre_buffer = pre_buffer
+ if thrift_string_size_limit is not None:
+ self.thrift_string_size_limit = thrift_string_size_limit
+ if thrift_container_size_limit is not None:
+ self.thrift_container_size_limit = thrift_container_size_limit
cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
FragmentScanOptions.init(self, sp)
@@ -654,17 +668,49 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
def pre_buffer(self, bint pre_buffer):
self.arrow_reader_properties().set_pre_buffer(pre_buffer)
+ @property
+ def thrift_string_size_limit(self):
+ return self.reader_properties().thrift_string_size_limit()
+
+ @thrift_string_size_limit.setter
+ def thrift_string_size_limit(self, size):
+ if size <= 0:
+ raise ValueError("size must be larger than zero")
+ self.reader_properties().set_thrift_string_size_limit(size)
+
+ @property
+ def thrift_container_size_limit(self):
+ return self.reader_properties().thrift_container_size_limit()
+
+ @thrift_container_size_limit.setter
+ def thrift_container_size_limit(self, size):
+ if size <= 0:
+ raise ValueError("size must be larger than zero")
+ self.reader_properties().set_thrift_container_size_limit(size)
+
def equals(self, ParquetFragmentScanOptions other):
- return (
- self.use_buffered_stream == other.use_buffered_stream and
- self.buffer_size == other.buffer_size and
- self.pre_buffer == other.pre_buffer
- )
+ attrs = (
+ self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+ self.thrift_string_size_limit, self.thrift_container_size_limit)
+ other_attrs = (
+ other.use_buffered_stream, other.buffer_size, other.pre_buffer,
+ other.thrift_string_size_limit,
+ other.thrift_container_size_limit)
+ return attrs == other_attrs
+
+ @classmethod
+ def _reconstruct(cls, kwargs):
+ return cls(**kwargs)
def __reduce__(self):
- return ParquetFragmentScanOptions, (
- self.use_buffered_stream, self.buffer_size, self.pre_buffer
+ kwargs = dict(
+ use_buffered_stream=self.use_buffered_stream,
+ buffer_size=self.buffer_size,
+ pre_buffer=self.pre_buffer,
+ thrift_string_size_limit=self.thrift_string_size_limit,
+ thrift_container_size_limit=self.thrift_container_size_limit,
)
+ return type(self)._reconstruct, (kwargs,)
cdef class ParquetFactoryOptions(_Weakrefable):
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 98857d5b48..29b625df50 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -359,8 +359,16 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
c_bool is_buffered_stream_enabled() const
void enable_buffered_stream()
void disable_buffered_stream()
+
void set_buffer_size(int64_t buf_size)
int64_t buffer_size() const
+
+ void set_thrift_string_size_limit(int32_t size)
+ int32_t thrift_string_size_limit() const
+
+ void set_thrift_container_size_limit(int32_t size)
+ int32_t thrift_container_size_limit() const
+
void file_decryption_properties(shared_ptr[CFileDecryptionProperties]
decryption)
shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 8812ab1059..45f89e8b91 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1167,11 +1167,13 @@ cdef class ParquetReader(_Weakrefable):
self.pool = maybe_unbox_memory_pool(memory_pool)
self._metadata = None
- def open(self, object source not None, bint use_memory_map=True,
+ def open(self, object source not None, *, bint use_memory_map=True,
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0, bint pre_buffer=False,
coerce_int96_timestamp_unit=None,
- FileDecryptionProperties decryption_properties=None):
+ FileDecryptionProperties decryption_properties=None,
+ thrift_string_size_limit=None,
+ thrift_container_size_limit=None):
cdef:
shared_ptr[CRandomAccessFile] rd_handle
shared_ptr[CFileMetaData] c_metadata
@@ -1193,6 +1195,18 @@ cdef class ParquetReader(_Weakrefable):
else:
raise ValueError('Buffer size must be larger than zero')
+ if thrift_string_size_limit is not None:
+ if thrift_string_size_limit <= 0:
+ raise ValueError("thrift_string_size_limit "
+ "must be larger than zero")
+ properties.set_thrift_string_size_limit(thrift_string_size_limit)
+ if thrift_container_size_limit is not None:
+ if thrift_container_size_limit <= 0:
+ raise ValueError("thrift_container_size_limit "
+ "must be larger than zero")
+ properties.set_thrift_container_size_limit(
+ thrift_container_size_limit)
+
if decryption_properties is not None:
properties.file_decryption_properties(
decryption_properties.unwrap())
diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py
index 7d1c8a4308..cbdf51b238 100644
--- a/python/pyarrow/parquet/__init__.py
+++ b/python/pyarrow/parquet/__init__.py
@@ -226,6 +226,14 @@ class ParquetFile:
in nanoseconds.
decryption_properties : FileDecryptionProperties, default None
File decryption properties for Parquet Modular Encryption.
+ thrift_string_size_limit : int, default None
+ If not None, override the maximum total string size allocated
+ when decoding Thrift structures. The default limit should be
+ sufficient for most Parquet files.
+ thrift_container_size_limit : int, default None
+ If not None, override the maximum total size of containers allocated
+ when decoding Thrift structures. The default limit should be
+ sufficient for most Parquet files.
Examples
--------
@@ -269,17 +277,20 @@ class ParquetFile:
[0,1,2,3,4,5]]
"""
- def __init__(self, source, metadata=None, common_metadata=None,
+ def __init__(self, source, *, metadata=None, common_metadata=None,
read_dictionary=None, memory_map=False, buffer_size=0,
pre_buffer=False, coerce_int96_timestamp_unit=None,
- decryption_properties=None):
+ decryption_properties=None, thrift_string_size_limit=None,
+ thrift_container_size_limit=None):
self.reader = ParquetReader()
self.reader.open(
source, use_memory_map=memory_map,
buffer_size=buffer_size, pre_buffer=pre_buffer,
read_dictionary=read_dictionary, metadata=metadata,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
- decryption_properties=decryption_properties
+ decryption_properties=decryption_properties,
+ thrift_string_size_limit=thrift_string_size_limit,
+ thrift_container_size_limit=thrift_container_size_limit,
)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
@@ -1650,7 +1661,9 @@ Examples
filters=None, metadata_nthreads=None, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=None, pre_buffer=True,
- coerce_int96_timestamp_unit=None):
+ coerce_int96_timestamp_unit=None,
+ thrift_string_size_limit=None,
+ thrift_container_size_limit=None):
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
@@ -1673,7 +1686,9 @@ Examples
schema=schema, metadata=metadata,
split_row_groups=split_row_groups,
validate_schema=validate_schema,
- metadata_nthreads=metadata_nthreads
+ metadata_nthreads=metadata_nthreads,
+ thrift_string_size_limit=thrift_string_size_limit,
+ thrift_container_size_limit=thrift_container_size_limit,
)
self = object.__new__(cls)
return self
@@ -1683,7 +1698,9 @@ Examples
filters=None, metadata_nthreads=None, read_dictionary=None,
memory_map=False, buffer_size=0, partitioning="hive",
use_legacy_dataset=True, pre_buffer=True,
- coerce_int96_timestamp_unit=None):
+ coerce_int96_timestamp_unit=None,
+ thrift_string_size_limit=None,
+ thrift_container_size_limit=None):
if partitioning != "hive":
raise ValueError(
'Only "hive" for hive-like partitioning is supported when '
@@ -2258,11 +2275,13 @@ class _ParquetDatasetV2:
1 4 Horse 2022
"""
- def __init__(self, path_or_paths, filesystem=None, filters=None,
+ def __init__(self, path_or_paths, filesystem=None, *, filters=None,
partitioning="hive", read_dictionary=None, buffer_size=None,
memory_map=False, ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None, schema=None,
- decryption_properties=None, **kwargs):
+ decryption_properties=None, thrift_string_size_limit=None,
+ thrift_container_size_limit=None,
+ **kwargs):
import pyarrow.dataset as ds
# Raise error for not supported keywords
@@ -2277,7 +2296,9 @@ class _ParquetDatasetV2:
# map format arguments
read_options = {
"pre_buffer": pre_buffer,
- "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit
+ "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
+ "thrift_string_size_limit": thrift_string_size_limit,
+ "thrift_container_size_limit": thrift_container_size_limit,
}
if buffer_size:
read_options.update(use_buffered_stream=True,
@@ -2636,6 +2657,15 @@ decryption_properties : FileDecryptionProperties or None
File-level decryption properties.
The decryption properties can be created using
``CryptoFactory.file_decryption_properties()``.
+thrift_string_size_limit : int, default None
+ If not None, override the maximum total string size allocated
+ when decoding Thrift structures. The default limit should be
+ sufficient for most Parquet files.
+thrift_container_size_limit : int, default None
+ If not None, override the maximum total size of containers allocated
+ when decoding Thrift structures. The default limit should be
+ sufficient for most Parquet files.
+
Returns
-------
{2}
@@ -2722,13 +2752,14 @@ Read data from a single Parquet file:
"""
-def read_table(source, columns=None, use_threads=True, metadata=None,
+def read_table(source, *, columns=None, use_threads=True, metadata=None,
schema=None, use_pandas_metadata=False, memory_map=False,
read_dictionary=None, filesystem=None, filters=None,
buffer_size=0, partitioning="hive", use_legacy_dataset=False,
ignore_prefixes=None, pre_buffer=True,
coerce_int96_timestamp_unit=None,
- decryption_properties=None):
+ decryption_properties=None, thrift_string_size_limit=None,
+ thrift_container_size_limit=None):
if not use_legacy_dataset:
if metadata is not None:
raise ValueError(
@@ -2749,7 +2780,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
filters=filters,
ignore_prefixes=ignore_prefixes,
pre_buffer=pre_buffer,
- coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
+ coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
+ thrift_string_size_limit=thrift_string_size_limit,
+ thrift_container_size_limit=thrift_container_size_limit,
)
except ImportError:
# fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -2778,7 +2811,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
memory_map=memory_map, buffer_size=buffer_size,
pre_buffer=pre_buffer,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
- decryption_properties=decryption_properties
+ decryption_properties=decryption_properties,
+ thrift_string_size_limit=thrift_string_size_limit,
+ thrift_container_size_limit=thrift_container_size_limit,
)
return dataset.read(columns=columns, use_threads=use_threads,
diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py
index 9d31bdeb60..62ea19d422 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -802,3 +802,30 @@ def test_read_table_legacy_deprecated(tempdir):
FutureWarning, match="Passing 'use_legacy_dataset=True'"
):
pq.read_table(path, use_legacy_dataset=True)
+
+
+def test_thrift_size_limits(tempdir):
+ path = tempdir / 'largethrift.parquet'
+
+ array = pa.array(list(range(10)))
+ num_cols = 1000
+ table = pa.table(
+ [array] * num_cols,
+ names=[f'some_long_column_name_{i}' for i in range(num_cols)])
+ pq.write_table(table, path)
+
+ with pytest.raises(
+ OSError,
+ match="Couldn't deserialize thrift:.*Exceeded size limit"):
+ pq.read_table(path, thrift_string_size_limit=50 * num_cols)
+ with pytest.raises(
+ OSError,
+ match="Couldn't deserialize thrift:.*Exceeded size limit"):
+ pq.read_table(path, thrift_container_size_limit=num_cols)
+
+ got = pq.read_table(path, thrift_string_size_limit=100 * num_cols)
+ assert got == table
+ got = pq.read_table(path, thrift_container_size_limit=2 * num_cols)
+ assert got == table
+ got = pq.read_table(path)
+ assert got == table
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index d2210c4b6c..81fa7d1245 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -748,10 +748,15 @@ def test_parquet_scan_options():
opts3 = ds.ParquetFragmentScanOptions(
buffer_size=2**13, use_buffered_stream=True)
opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True)
+ opts5 = ds.ParquetFragmentScanOptions(
+ thrift_string_size_limit=123456,
+ thrift_container_size_limit=987654,)
assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
assert opts1.pre_buffer is False
+ assert opts1.thrift_string_size_limit == 100_000_000 # default in C++
+ assert opts1.thrift_container_size_limit == 1_000_000 # default in C++
assert opts2.use_buffered_stream is False
assert opts2.buffer_size == 2**12
@@ -765,10 +770,14 @@ def test_parquet_scan_options():
assert opts4.buffer_size == 2**13
assert opts4.pre_buffer is True
+ assert opts5.thrift_string_size_limit == 123456
+ assert opts5.thrift_container_size_limit == 987654
+
assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3
assert opts3 != opts4
+ assert opts5 != opts1
def test_file_format_pickling():
@@ -795,6 +804,8 @@ def test_file_format_pickling():
ds.ParquetFileFormat(
use_buffered_stream=True,
buffer_size=4096,
+ thrift_string_size_limit=123,
+ thrift_container_size_limit=456,
),
])