You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/06/07 11:29:16 UTC

[arrow] branch master updated: ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275)

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dd33c750e0 ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275)
dd33c750e0 is described below

commit dd33c750e068baddba99589076aa12b892268640
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Tue Jun 7 13:29:05 2022 +0200

    ARROW-16546: [Parquet][C++][Python] Make Thrift limits configurable (#13275)
    
    In fringe cases, users may have Parquet files where deserializing exceeds our default Thrift size limits.
    
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/cmake_modules/SetupCxxFlags.cmake      |   1 -
 cpp/src/arrow/dataset/file_parquet.cc      |   4 +
 cpp/src/parquet/column_reader.cc           |  31 +++++--
 cpp/src/parquet/column_reader.h            |   5 ++
 cpp/src/parquet/column_writer_test.cc      |   8 +-
 cpp/src/parquet/file_reader.cc             |   4 +-
 cpp/src/parquet/metadata.cc                | 123 ++++++++++++++++---------
 cpp/src/parquet/metadata.h                 |  35 +++++++-
 cpp/src/parquet/properties.h               |  16 ++++
 cpp/src/parquet/statistics_test.cc         |   4 +-
 cpp/src/parquet/thrift_internal.h          | 138 ++++++++++++++++-------------
 python/pyarrow/_dataset_parquet.pyx        |  64 +++++++++++--
 python/pyarrow/_parquet.pxd                |   8 ++
 python/pyarrow/_parquet.pyx                |  18 +++-
 python/pyarrow/parquet/__init__.py         |  61 ++++++++++---
 python/pyarrow/tests/parquet/test_basic.py |  27 ++++++
 python/pyarrow/tests/test_dataset.py       |  11 +++
 17 files changed, 407 insertions(+), 151 deletions(-)

diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 713bc5528b..66b84dbc29 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -290,7 +290,6 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
   elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-conversion")
-    set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated-declarations")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-sign-conversion")
     set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wunused-result")
   elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Intel")
diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc
index f2a3903208..0d95e18171 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -69,6 +69,10 @@ parquet::ReaderProperties MakeReaderProperties(
   properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
   properties.file_decryption_properties(
       parquet_scan_options->reader_properties->file_decryption_properties());
+  properties.set_thrift_string_size_limit(
+      parquet_scan_options->reader_properties->thrift_string_size_limit());
+  properties.set_thrift_container_size_limit(
+      parquet_scan_options->reader_properties->thrift_container_size_limit());
   return properties;
 }
 
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 76476c5da7..4f82992aeb 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -223,14 +223,15 @@ EncodedStatistics ExtractStatsFromHeader(const H& header) {
 class SerializedPageReader : public PageReader {
  public:
   SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
-                       Compression::type codec, ::arrow::MemoryPool* pool,
+                       Compression::type codec, const ReaderProperties& properties,
                        const CryptoContext* crypto_ctx)
-      : stream_(std::move(stream)),
-        decompression_buffer_(AllocateBuffer(pool, 0)),
+      : properties_(properties),
+        stream_(std::move(stream)),
+        decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
         page_ordinal_(0),
         seen_num_rows_(0),
         total_num_rows_(total_num_rows),
-        decryption_buffer_(AllocateBuffer(pool, 0)) {
+        decryption_buffer_(AllocateBuffer(properties_.memory_pool(), 0)) {
     if (crypto_ctx != nullptr) {
       crypto_ctx_ = *crypto_ctx;
       InitDecryption();
@@ -254,6 +255,7 @@ class SerializedPageReader : public PageReader {
                                              int compressed_len, int uncompressed_len,
                                              int levels_byte_len = 0);
 
+  const ReaderProperties properties_;
   std::shared_ptr<ArrowInputStream> stream_;
 
   format::PageHeader current_page_header_;
@@ -326,9 +328,10 @@ void SerializedPageReader::UpdateDecryption(const std::shared_ptr<Decryptor>& de
 }
 
 std::shared_ptr<Page> SerializedPageReader::NextPage() {
+  ThriftDeserializer deserializer(properties_);
+
   // Loop here because there may be unhandled page types that we skip until
   // finding a page that we do know what to do with
-
   while (seen_num_rows_ < total_num_rows_) {
     uint32_t header_size = 0;
     uint32_t allowed_page_size = kDefaultPageHeaderSize;
@@ -349,8 +352,9 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
           UpdateDecryption(crypto_ctx_.meta_decryptor, encryption::kDictionaryPageHeader,
                            data_page_header_aad_);
         }
-        DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(view.data()), &header_size,
-                             &current_page_header_, crypto_ctx_.meta_decryptor);
+        deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(view.data()),
+                                        &header_size, &current_page_header_,
+                                        crypto_ctx_.meta_decryptor);
         break;
       } catch (std::exception& e) {
         // Failed to deserialize. Double the allowed page header size and try again
@@ -508,13 +512,22 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
 
 }  // namespace
 
+std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
+                                             int64_t total_num_rows,
+                                             Compression::type codec,
+                                             const ReaderProperties& properties,
+                                             const CryptoContext* ctx) {
+  return std::unique_ptr<PageReader>(new SerializedPageReader(
+      std::move(stream), total_num_rows, codec, properties, ctx));
+}
+
 std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> stream,
                                              int64_t total_num_rows,
                                              Compression::type codec,
                                              ::arrow::MemoryPool* pool,
                                              const CryptoContext* ctx) {
-  return std::unique_ptr<PageReader>(
-      new SerializedPageReader(std::move(stream), total_num_rows, codec, pool, ctx));
+  return std::unique_ptr<PageReader>(new SerializedPageReader(
+      std::move(stream), total_num_rows, codec, ReaderProperties(pool), ctx));
 }
 
 namespace {
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 50eac31dc3..c22f9f2fc7 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -25,6 +25,7 @@
 #include "parquet/exception.h"
 #include "parquet/level_conversion.h"
 #include "parquet/platform.h"
+#include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
 
@@ -106,6 +107,10 @@ class PARQUET_EXPORT PageReader {
       std::shared_ptr<ArrowInputStream> stream, int64_t total_num_rows,
       Compression::type codec, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
       const CryptoContext* ctx = NULLPTR);
+  static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream,
+                                          int64_t total_num_rows, Compression::type codec,
+                                          const ReaderProperties& properties,
+                                          const CryptoContext* ctx = NULLPTR);
 
   // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
   // containing new Page otherwise
diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc
index 61f4c47e78..2cd21628b3 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -276,8 +276,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
     // complete (no changes to the metadata buffer can be made after instantiation)
     ApplicationVersion app_version(this->writer_properties_->created_by());
-    auto metadata_accessor =
-        ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
+    auto metadata_accessor = ColumnChunkMetaData::Make(
+        metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
     return metadata_accessor->is_stats_set();
   }
 
@@ -286,8 +286,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
     // This is because the ColumnChunkMetaData semantics dictate the metadata object is
     // complete (no changes to the metadata buffer can be made after instantiation)
     ApplicationVersion app_version(this->writer_properties_->created_by());
-    auto metadata_accessor =
-        ColumnChunkMetaData::Make(metadata_->contents(), this->descr_, &app_version);
+    auto metadata_accessor = ColumnChunkMetaData::Make(
+        metadata_->contents(), this->descr_, default_reader_properties(), &app_version);
     auto encoded_stats = metadata_accessor->statistics()->Encode();
     return {encoded_stats.has_min, encoded_stats.has_max};
   }
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 3f4c2cb76a..51f90b1730 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -543,8 +543,8 @@ uint32_t SerializedFile::ParseUnencryptedFileMetadata(
   }
   uint32_t read_metadata_len = metadata_len;
   // The encrypted read path falls through to here, so pass in the decryptor
-  file_metadata_ =
-      FileMetaData::Make(metadata_buffer->data(), &read_metadata_len, file_decryptor_);
+  file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &read_metadata_len,
+                                      properties_, file_decryptor_);
   return read_metadata_len;
 }
 
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 102fa874ad..6226c3ad09 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -176,9 +176,13 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
   explicit ColumnChunkMetaDataImpl(const format::ColumnChunk* column,
                                    const ColumnDescriptor* descr,
                                    int16_t row_group_ordinal, int16_t column_ordinal,
+                                   const ReaderProperties& properties,
                                    const ApplicationVersion* writer_version,
                                    std::shared_ptr<InternalFileDecryptor> file_decryptor)
-      : column_(column), descr_(descr), writer_version_(writer_version) {
+      : column_(column),
+        descr_(descr),
+        properties_(properties),
+        writer_version_(writer_version) {
     column_metadata_ = &column->meta_data;
     if (column->__isset.crypto_metadata) {  // column metadata is encrypted
       format::ColumnCryptoMetaData ccmd = column->crypto_metadata;
@@ -196,7 +200,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
           auto decryptor = file_decryptor->GetColumnMetaDecryptor(
               path->ToDotString(), key_metadata, aad_column_metadata);
           auto len = static_cast<uint32_t>(column->encrypted_column_metadata.size());
-          DeserializeThriftMsg(
+          ThriftDeserializer deserializer(properties_);
+          deserializer.DeserializeMessage(
               reinterpret_cast<const uint8_t*>(column->encrypted_column_metadata.c_str()),
               &len, &decrypted_metadata_, decryptor);
           column_metadata_ = &decrypted_metadata_;
@@ -306,26 +311,38 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
   const format::ColumnMetaData* column_metadata_;
   format::ColumnMetaData decrypted_metadata_;
   const ColumnDescriptor* descr_;
+  const ReaderProperties properties_;
   const ApplicationVersion* writer_version_;
 };
 
 std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
     const void* metadata, const ColumnDescriptor* descr,
-    const ApplicationVersion* writer_version, int16_t row_group_ordinal,
-    int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+    const ReaderProperties& properties, const ApplicationVersion* writer_version,
+    int16_t row_group_ordinal, int16_t column_ordinal,
+    std::shared_ptr<InternalFileDecryptor> file_decryptor) {
   return std::unique_ptr<ColumnChunkMetaData>(
       new ColumnChunkMetaData(metadata, descr, row_group_ordinal, column_ordinal,
-                              writer_version, std::move(file_decryptor)));
+                              properties, writer_version, std::move(file_decryptor)));
+}
+
+std::unique_ptr<ColumnChunkMetaData> ColumnChunkMetaData::Make(
+    const void* metadata, const ColumnDescriptor* descr,
+    const ApplicationVersion* writer_version, int16_t row_group_ordinal,
+    int16_t column_ordinal, std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+  return std::unique_ptr<ColumnChunkMetaData>(new ColumnChunkMetaData(
+      metadata, descr, row_group_ordinal, column_ordinal, default_reader_properties(),
+      writer_version, std::move(file_decryptor)));
 }
 
 ColumnChunkMetaData::ColumnChunkMetaData(
     const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal,
-    int16_t column_ordinal, const ApplicationVersion* writer_version,
+    int16_t column_ordinal, const ReaderProperties& properties,
+    const ApplicationVersion* writer_version,
     std::shared_ptr<InternalFileDecryptor> file_decryptor)
     : impl_{new ColumnChunkMetaDataImpl(
           reinterpret_cast<const format::ColumnChunk*>(metadata), descr,
-          row_group_ordinal, column_ordinal, writer_version, std::move(file_decryptor))} {
-}
+          row_group_ordinal, column_ordinal, properties, writer_version,
+          std::move(file_decryptor))} {}
 
 ColumnChunkMetaData::~ColumnChunkMetaData() = default;
 
@@ -403,10 +420,12 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
  public:
   explicit RowGroupMetaDataImpl(const format::RowGroup* row_group,
                                 const SchemaDescriptor* schema,
+                                const ReaderProperties& properties,
                                 const ApplicationVersion* writer_version,
                                 std::shared_ptr<InternalFileDecryptor> file_decryptor)
       : row_group_(row_group),
         schema_(schema),
+        properties_(properties),
         writer_version_(writer_version),
         file_decryptor_(std::move(file_decryptor)) {}
 
@@ -431,7 +450,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
   std::unique_ptr<ColumnChunkMetaData> ColumnChunk(int i) {
     if (i < num_columns()) {
       return ColumnChunkMetaData::Make(&row_group_->columns[i], schema_->Column(i),
-                                       writer_version_, row_group_->ordinal,
+                                       properties_, writer_version_, row_group_->ordinal,
                                        static_cast<int16_t>(i), file_decryptor_);
     }
     throw ParquetException("The file only has ", num_columns(),
@@ -441,6 +460,7 @@ class RowGroupMetaData::RowGroupMetaDataImpl {
  private:
   const format::RowGroup* row_group_;
   const SchemaDescriptor* schema_;
+  const ReaderProperties properties_;
   const ApplicationVersion* writer_version_;
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 };
@@ -449,16 +469,26 @@ std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
     const void* metadata, const SchemaDescriptor* schema,
     const ApplicationVersion* writer_version,
     std::shared_ptr<InternalFileDecryptor> file_decryptor) {
-  return std::unique_ptr<RowGroupMetaData>(
-      new RowGroupMetaData(metadata, schema, writer_version, std::move(file_decryptor)));
+  return std::unique_ptr<parquet::RowGroupMetaData>(
+      new RowGroupMetaData(metadata, schema, default_reader_properties(), writer_version,
+                           std::move(file_decryptor)));
+}
+
+std::unique_ptr<RowGroupMetaData> RowGroupMetaData::Make(
+    const void* metadata, const SchemaDescriptor* schema,
+    const ReaderProperties& properties, const ApplicationVersion* writer_version,
+    std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+  return std::unique_ptr<parquet::RowGroupMetaData>(new RowGroupMetaData(
+      metadata, schema, properties, writer_version, std::move(file_decryptor)));
 }
 
 RowGroupMetaData::RowGroupMetaData(const void* metadata, const SchemaDescriptor* schema,
+                                   const ReaderProperties& properties,
                                    const ApplicationVersion* writer_version,
                                    std::shared_ptr<InternalFileDecryptor> file_decryptor)
     : impl_{new RowGroupMetaDataImpl(reinterpret_cast<const format::RowGroup*>(metadata),
-                                     schema, writer_version, std::move(file_decryptor))} {
-}
+                                     schema, properties, writer_version,
+                                     std::move(file_decryptor))} {}
 
 RowGroupMetaData::~RowGroupMetaData() = default;
 
@@ -500,16 +530,17 @@ class FileMetaData::FileMetaDataImpl {
   FileMetaDataImpl() = default;
 
   explicit FileMetaDataImpl(
-      const void* metadata, uint32_t* metadata_len,
+      const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties,
       std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr)
-      : file_decryptor_(file_decryptor) {
+      : properties_(properties), file_decryptor_(file_decryptor) {
     metadata_.reset(new format::FileMetaData);
 
     auto footer_decryptor =
         file_decryptor_ != nullptr ? file_decryptor->GetFooterDecryptor() : nullptr;
 
-    DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(metadata), metadata_len,
-                         metadata_.get(), footer_decryptor);
+    ThriftDeserializer deserializer(properties_);
+    deserializer.DeserializeMessage(reinterpret_cast<const uint8_t*>(metadata),
+                                    metadata_len, metadata_.get(), footer_decryptor);
     metadata_len_ = *metadata_len;
 
     if (metadata_->__isset.created_by) {
@@ -622,8 +653,8 @@ class FileMetaData::FileMetaDataImpl {
          << " row groups, requested metadata for row group: " << i;
       throw ParquetException(ss.str());
     }
-    return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, &writer_version_,
-                                  file_decryptor_);
+    return RowGroupMetaData::Make(&metadata_->row_groups[i], &schema_, properties_,
+                                  &writer_version_, file_decryptor_);
   }
 
   bool Equals(const FileMetaDataImpl& other) const {
@@ -718,6 +749,7 @@ class FileMetaData::FileMetaDataImpl {
   SchemaDescriptor schema_;
   ApplicationVersion writer_version_;
   std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
+  const ReaderProperties properties_;
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 
   void InitSchema() {
@@ -759,20 +791,26 @@ class FileMetaData::FileMetaDataImpl {
 };
 
 std::shared_ptr<FileMetaData> FileMetaData::Make(
-    const void* metadata, uint32_t* metadata_len,
+    const void* metadata, uint32_t* metadata_len, const ReaderProperties& properties,
     std::shared_ptr<InternalFileDecryptor> file_decryptor) {
   // This FileMetaData ctor is private, not compatible with std::make_shared
   return std::shared_ptr<FileMetaData>(
-      new FileMetaData(metadata, metadata_len, file_decryptor));
+      new FileMetaData(metadata, metadata_len, properties, file_decryptor));
+}
+
+std::shared_ptr<FileMetaData> FileMetaData::Make(
+    const void* metadata, uint32_t* metadata_len,
+    std::shared_ptr<InternalFileDecryptor> file_decryptor) {
+  return std::shared_ptr<FileMetaData>(new FileMetaData(
+      metadata, metadata_len, default_reader_properties(), file_decryptor));
 }
 
 FileMetaData::FileMetaData(const void* metadata, uint32_t* metadata_len,
+                           const ReaderProperties& properties,
                            std::shared_ptr<InternalFileDecryptor> file_decryptor)
-    : impl_{std::unique_ptr<FileMetaDataImpl>(
-          new FileMetaDataImpl(metadata, metadata_len, file_decryptor))} {}
+    : impl_(new FileMetaDataImpl(metadata, metadata_len, properties, file_decryptor)) {}
 
-FileMetaData::FileMetaData()
-    : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {}
+FileMetaData::FileMetaData() : impl_(new FileMetaDataImpl()) {}
 
 FileMetaData::~FileMetaData() = default;
 
@@ -870,24 +908,27 @@ class FileCryptoMetaData::FileCryptoMetaDataImpl {
  public:
   FileCryptoMetaDataImpl() = default;
 
-  explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) {
-    metadata_.reset(new format::FileCryptoMetaData);
-    DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+  explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len,
+                                  const ReaderProperties& properties) {
+    ThriftDeserializer deserializer(properties);
+    deserializer.DeserializeMessage(metadata, metadata_len, &metadata_);
     metadata_len_ = *metadata_len;
   }
 
-  EncryptionAlgorithm encryption_algorithm() {
-    return FromThrift(metadata_->encryption_algorithm);
+  EncryptionAlgorithm encryption_algorithm() const {
+    return FromThrift(metadata_.encryption_algorithm);
   }
-  const std::string& key_metadata() { return metadata_->key_metadata; }
+
+  const std::string& key_metadata() const { return metadata_.key_metadata; }
+
   void WriteTo(::arrow::io::OutputStream* dst) const {
     ThriftSerializer serializer;
-    serializer.Serialize(metadata_.get(), dst);
+    serializer.Serialize(&metadata_, dst);
   }
 
  private:
   friend FileMetaDataBuilder;
-  std::unique_ptr<format::FileCryptoMetaData> metadata_;
+  format::FileCryptoMetaData metadata_;
   uint32_t metadata_len_;
 };
 
@@ -900,14 +941,16 @@ const std::string& FileCryptoMetaData::key_metadata() const {
 }
 
 std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make(
-    const uint8_t* serialized_metadata, uint32_t* metadata_len) {
+    const uint8_t* serialized_metadata, uint32_t* metadata_len,
+    const ReaderProperties& properties) {
   return std::shared_ptr<FileCryptoMetaData>(
-      new FileCryptoMetaData(serialized_metadata, metadata_len));
+      new FileCryptoMetaData(serialized_metadata, metadata_len, properties));
 }
 
 FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata,
-                                       uint32_t* metadata_len)
-    : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len)) {}
+                                       uint32_t* metadata_len,
+                                       const ReaderProperties& properties)
+    : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len, properties)) {}
 
 FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {}
 
@@ -1754,10 +1797,8 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
       crypto_metadata_->__set_key_metadata(key_metadata);
     }
 
-    std::unique_ptr<FileCryptoMetaData> file_crypto_metadata =
-        std::unique_ptr<FileCryptoMetaData>(new FileCryptoMetaData());
-    file_crypto_metadata->impl_->metadata_ = std::move(crypto_metadata_);
-
+    std::unique_ptr<FileCryptoMetaData> file_crypto_metadata(new FileCryptoMetaData());
+    file_crypto_metadata->impl_->metadata_ = std::move(*crypto_metadata_);
     return file_crypto_metadata;
   }
 
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 3dd936d90e..89dca5667b 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -121,8 +121,17 @@ struct PageEncodingStats {
 class PARQUET_EXPORT ColumnChunkMetaData {
  public:
   // API convenience to get a MetaData accessor
+
+  ARROW_DEPRECATED("Use the ReaderProperties-taking overload")
   static std::unique_ptr<ColumnChunkMetaData> Make(
       const void* metadata, const ColumnDescriptor* descr,
+      const ApplicationVersion* writer_version, int16_t row_group_ordinal = -1,
+      int16_t column_ordinal = -1,
+      std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
+
+  static std::unique_ptr<ColumnChunkMetaData> Make(
+      const void* metadata, const ColumnDescriptor* descr,
+      const ReaderProperties& properties = default_reader_properties(),
       const ApplicationVersion* writer_version = NULLPTR, int16_t row_group_ordinal = -1,
       int16_t column_ordinal = -1,
       std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
@@ -164,7 +173,8 @@ class PARQUET_EXPORT ColumnChunkMetaData {
  private:
   explicit ColumnChunkMetaData(
       const void* metadata, const ColumnDescriptor* descr, int16_t row_group_ordinal,
-      int16_t column_ordinal, const ApplicationVersion* writer_version = NULLPTR,
+      int16_t column_ordinal, const ReaderProperties& properties,
+      const ApplicationVersion* writer_version = NULLPTR,
       std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
   // PIMPL Idiom
   class ColumnChunkMetaDataImpl;
@@ -174,9 +184,16 @@ class PARQUET_EXPORT ColumnChunkMetaData {
 /// \brief RowGroupMetaData is a proxy around format::RowGroupMetaData.
 class PARQUET_EXPORT RowGroupMetaData {
  public:
+  ARROW_DEPRECATED("Use the ReaderProperties-taking overload")
+  static std::unique_ptr<RowGroupMetaData> Make(
+      const void* metadata, const SchemaDescriptor* schema,
+      const ApplicationVersion* writer_version,
+      std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
+
   /// \brief Create a RowGroupMetaData from a serialized thrift message.
   static std::unique_ptr<RowGroupMetaData> Make(
       const void* metadata, const SchemaDescriptor* schema,
+      const ReaderProperties& properties = default_reader_properties(),
       const ApplicationVersion* writer_version = NULLPTR,
       std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
 
@@ -225,6 +242,7 @@ class PARQUET_EXPORT RowGroupMetaData {
  private:
   explicit RowGroupMetaData(
       const void* metadata, const SchemaDescriptor* schema,
+      const ReaderProperties& properties,
       const ApplicationVersion* writer_version = NULLPTR,
       std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
   // PIMPL Idiom
@@ -237,9 +255,15 @@ class FileMetaDataBuilder;
 /// \brief FileMetaData is a proxy around format::FileMetaData.
 class PARQUET_EXPORT FileMetaData {
  public:
+  ARROW_DEPRECATED("Use the ReaderProperties-taking overload")
+  static std::shared_ptr<FileMetaData> Make(
+      const void* serialized_metadata, uint32_t* inout_metadata_len,
+      std::shared_ptr<InternalFileDecryptor> file_decryptor);
+
   /// \brief Create a FileMetaData from a serialized thrift message.
   static std::shared_ptr<FileMetaData> Make(
       const void* serialized_metadata, uint32_t* inout_metadata_len,
+      const ReaderProperties& properties = default_reader_properties(),
       std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
 
   ~FileMetaData();
@@ -350,6 +374,7 @@ class PARQUET_EXPORT FileMetaData {
   friend class SerializedFile;
 
   explicit FileMetaData(const void* serialized_metadata, uint32_t* metadata_len,
+                        const ReaderProperties& properties,
                         std::shared_ptr<InternalFileDecryptor> file_decryptor = NULLPTR);
 
   void set_file_decryptor(std::shared_ptr<InternalFileDecryptor> file_decryptor);
@@ -363,8 +388,9 @@ class PARQUET_EXPORT FileMetaData {
 class PARQUET_EXPORT FileCryptoMetaData {
  public:
   // API convenience to get a MetaData accessor
-  static std::shared_ptr<FileCryptoMetaData> Make(const uint8_t* serialized_metadata,
-                                                  uint32_t* metadata_len);
+  static std::shared_ptr<FileCryptoMetaData> Make(
+      const uint8_t* serialized_metadata, uint32_t* metadata_len,
+      const ReaderProperties& properties = default_reader_properties());
   ~FileCryptoMetaData();
 
   EncryptionAlgorithm encryption_algorithm() const;
@@ -374,7 +400,8 @@ class PARQUET_EXPORT FileCryptoMetaData {
 
  private:
   friend FileMetaDataBuilder;
-  FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+  FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len,
+                     const ReaderProperties& properties);
 
   // PIMPL Idiom
   FileCryptoMetaData();
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 5c81c75357..1d5c360cc1 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -49,6 +49,12 @@ enum class ParquetDataPageVersion { V1, V2 };
 /// Align the default buffer size to a small multiple of a page size.
 constexpr int64_t kDefaultBufferSize = 4096 * 4;
 
+constexpr int32_t kDefaultThriftStringSizeLimit = 100 * 1000 * 1000;
+// Structs in the thrift definition are relatively large (at least 300 bytes).
+// This limits total memory to the same order of magnitude as
+// kDefaultStringSizeLimit.
+constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 1000;
+
 class PARQUET_EXPORT ReaderProperties {
  public:
   explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool())
@@ -73,6 +79,14 @@ class PARQUET_EXPORT ReaderProperties {
   int64_t buffer_size() const { return buffer_size_; }
   void set_buffer_size(int64_t size) { buffer_size_ = size; }
 
+  int32_t thrift_string_size_limit() const { return thrift_string_size_limit_; }
+  void set_thrift_string_size_limit(int32_t size) { thrift_string_size_limit_ = size; }
+
+  int32_t thrift_container_size_limit() const { return thrift_container_size_limit_; }
+  void set_thrift_container_size_limit(int32_t size) {
+    thrift_container_size_limit_ = size;
+  }
+
   void file_decryption_properties(std::shared_ptr<FileDecryptionProperties> decryption) {
     file_decryption_properties_ = std::move(decryption);
   }
@@ -84,6 +98,8 @@ class PARQUET_EXPORT ReaderProperties {
  private:
   MemoryPool* pool_;
   int64_t buffer_size_ = kDefaultBufferSize;
+  int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit;
+  int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
   bool buffered_stream_enabled_ = false;
   std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
 };
diff --git a/cpp/src/parquet/statistics_test.cc b/cpp/src/parquet/statistics_test.cc
index e678598919..03da895380 100644
--- a/cpp/src/parquet/statistics_test.cc
+++ b/cpp/src/parquet/statistics_test.cc
@@ -595,8 +595,8 @@ void AssertStatsSet(const ApplicationVersion& version,
                     std::shared_ptr<parquet::WriterProperties> props,
                     const ColumnDescriptor* column, bool expected_is_set) {
   auto metadata_builder = ColumnChunkMetaDataBuilder::Make(props, column);
-  auto column_chunk =
-      ColumnChunkMetaData::Make(metadata_builder->contents(), column, &version);
+  auto column_chunk = ColumnChunkMetaData::Make(metadata_builder->contents(), column,
+                                                default_reader_properties(), &version);
   EncodedStatistics stats;
   stats.set_is_signed(false);
   metadata_builder->SetStatistics(stats);
diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h
index 17248f3bdc..3c74dfc07b 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -42,6 +42,7 @@
 #include "parquet/encryption/internal_file_encryptor.h"
 #include "parquet/exception.h"
 #include "parquet/platform.h"
+#include "parquet/properties.h"
 #include "parquet/statistics.h"
 #include "parquet/types.h"
 
@@ -350,74 +351,84 @@ static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryptio
 
 using ThriftBuffer = apache::thrift::transport::TMemoryBuffer;
 
-// On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size
-// limit (ARROW-13655).  If we wanted to protect against huge messages, we could
-// do it ourselves since we know the message size up front.
+class ThriftDeserializer {
+ public:
+  explicit ThriftDeserializer(const ReaderProperties& properties)
+      : ThriftDeserializer(properties.thrift_string_size_limit(),
+                           properties.thrift_container_size_limit()) {}
+
+  ThriftDeserializer(int32_t string_size_limit, int32_t container_size_limit)
+      : string_size_limit_(string_size_limit),
+        container_size_limit_(container_size_limit) {}
 
-inline std::shared_ptr<ThriftBuffer> CreateReadOnlyMemoryBuffer(uint8_t* buf,
-                                                                uint32_t len) {
+  // Deserialize a thrift message from buf/len.  buf/len must at least contain
+  // all the bytes needed to store the thrift message.  On return, len will be
+  // set to the actual length of the header.
+  template <class T>
+  void DeserializeMessage(const uint8_t* buf, uint32_t* len, T* deserialized_msg,
+                          const std::shared_ptr<Decryptor>& decryptor = NULLPTR) {
+    if (decryptor == NULLPTR) {
+      // thrift message is not encrypted
+      DeserializeUnencryptedMessage(buf, len, deserialized_msg);
+    } else {
+      // thrift message is encrypted
+      uint32_t clen;
+      clen = *len;
+      // decrypt
+      auto decrypted_buffer = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(decryptor->pool(),
+                         static_cast<int64_t>(clen - decryptor->CiphertextSizeDelta())));
+      const uint8_t* cipher_buf = buf;
+      uint32_t decrypted_buffer_len =
+          decryptor->Decrypt(cipher_buf, 0, decrypted_buffer->mutable_data());
+      if (decrypted_buffer_len <= 0) {
+        throw ParquetException("Couldn't decrypt buffer\n");
+      }
+      *len = decrypted_buffer_len + decryptor->CiphertextSizeDelta();
+      DeserializeUnencryptedMessage(decrypted_buffer->data(), &decrypted_buffer_len,
+                                    deserialized_msg);
+    }
+  }
+
+ private:
+  // On Thrift 0.14.0+, we want to use TConfiguration to raise the max message size
+  // limit (ARROW-13655).  If we wanted to protect against huge messages, we could
+  // do it ourselves since we know the message size up front.
+  std::shared_ptr<ThriftBuffer> CreateReadOnlyMemoryBuffer(uint8_t* buf, uint32_t len) {
 #if PARQUET_THRIFT_VERSION_MAJOR > 0 || PARQUET_THRIFT_VERSION_MINOR >= 14
-  auto conf = std::make_shared<apache::thrift::TConfiguration>();
-  conf->setMaxMessageSize(std::numeric_limits<int>::max());
-  return std::shared_ptr<ThriftBuffer>(
-      new ThriftBuffer(buf, len, ThriftBuffer::OBSERVE, conf));
+    auto conf = std::make_shared<apache::thrift::TConfiguration>();
+    conf->setMaxMessageSize(std::numeric_limits<int>::max());
+    return std::shared_ptr<ThriftBuffer>(
+        new ThriftBuffer(buf, len, ThriftBuffer::OBSERVE, conf));
 #else
-  return std::shared_ptr<ThriftBuffer>(new ThriftBuffer(buf, len));
+    return std::shared_ptr<ThriftBuffer>(new ThriftBuffer(buf, len));
 #endif
-}
-
-template <class T>
-inline void DeserializeThriftUnencryptedMsg(const uint8_t* buf, uint32_t* len,
-                                            T* deserialized_msg) {
-  // Deserialize msg bytes into c++ thrift msg using memory transport.
-  auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast<uint8_t*>(buf), *len);
-  apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory;
-  // Protect against CPU and memory bombs
-  tproto_factory.setStringSizeLimit(100 * 1000 * 1000);
-  // Structs in the thrift definition are relatively large (at least 300 bytes).
-  // This limits total memory to the same order of magnitude as stringSize.
-  tproto_factory.setContainerSizeLimit(1000 * 1000);
-  std::shared_ptr<apache::thrift::protocol::TProtocol> tproto =  //
-      tproto_factory.getProtocol(tmem_transport);
-  try {
-    deserialized_msg->read(tproto.get());
-  } catch (std::exception& e) {
-    std::stringstream ss;
-    ss << "Couldn't deserialize thrift: " << e.what() << "\n";
-    throw ParquetException(ss.str());
   }
-  uint32_t bytes_left = tmem_transport->available_read();
-  *len = *len - bytes_left;
-}
 
-// Deserialize a thrift message from buf/len.  buf/len must at least contain
-// all the bytes needed to store the thrift message.  On return, len will be
-// set to the actual length of the header.
-template <class T>
-inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg,
-                                 const std::shared_ptr<Decryptor>& decryptor = NULLPTR) {
-  // thrift message is not encrypted
-  if (decryptor == NULLPTR) {
-    DeserializeThriftUnencryptedMsg(buf, len, deserialized_msg);
-  } else {  // thrift message is encrypted
-    uint32_t clen;
-    clen = *len;
-    // decrypt
-    std::shared_ptr<ResizableBuffer> decrypted_buffer =
-        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
-            decryptor->pool(),
-            static_cast<int64_t>(clen - decryptor->CiphertextSizeDelta())));
-    const uint8_t* cipher_buf = buf;
-    uint32_t decrypted_buffer_len =
-        decryptor->Decrypt(cipher_buf, 0, decrypted_buffer->mutable_data());
-    if (decrypted_buffer_len <= 0) {
-      throw ParquetException("Couldn't decrypt buffer\n");
+  template <class T>
+  void DeserializeUnencryptedMessage(const uint8_t* buf, uint32_t* len,
+                                     T* deserialized_msg) {
+    // Deserialize msg bytes into c++ thrift msg using memory transport.
+    auto tmem_transport = CreateReadOnlyMemoryBuffer(const_cast<uint8_t*>(buf), *len);
+    apache::thrift::protocol::TCompactProtocolFactoryT<ThriftBuffer> tproto_factory;
+    // Protect against CPU and memory bombs
+    tproto_factory.setStringSizeLimit(string_size_limit_);
+    tproto_factory.setContainerSizeLimit(container_size_limit_);
+    auto tproto = tproto_factory.getProtocol(tmem_transport);
+    try {
+      deserialized_msg->read(tproto.get());
+    } catch (std::exception& e) {
+      std::stringstream ss;
+      ss << "Couldn't deserialize thrift: " << e.what() << "\n";
+      throw ParquetException(ss.str());
     }
-    *len = decrypted_buffer_len + decryptor->CiphertextSizeDelta();
-    DeserializeThriftMsg(decrypted_buffer->data(), &decrypted_buffer_len,
-                         deserialized_msg);
+    uint32_t bytes_left = tmem_transport->available_read();
+    *len = *len - bytes_left;
   }
-}
+
+  const int32_t string_size_limit_;
+  const int32_t container_size_limit_;
+};
 
 /// Utility class to serialize thrift objects to a binary format.  This object
 /// should be reused if possible to reuse the underlying memory.
@@ -478,10 +489,9 @@ class ThriftSerializer {
   int64_t SerializeEncryptedObj(ArrowOutputStream* out, uint8_t* out_buffer,
                                 uint32_t out_length,
                                 const std::shared_ptr<Encryptor>& encryptor) {
-    std::shared_ptr<ResizableBuffer> cipher_buffer =
-        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
-            encryptor->pool(),
-            static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length)));
+    auto cipher_buffer = std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(
+        encryptor->pool(),
+        static_cast<int64_t>(encryptor->CiphertextSizeDelta() + out_length)));
     int cipher_buffer_len =
         encryptor->Encrypt(out_buffer, out_length, cipher_buffer->mutable_data());
 
diff --git a/python/pyarrow/_dataset_parquet.pyx b/python/pyarrow/_dataset_parquet.pyx
index 7b91d4c2c7..684253fff3 100644
--- a/python/pyarrow/_dataset_parquet.pyx
+++ b/python/pyarrow/_dataset_parquet.pyx
@@ -598,6 +598,14 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
         If enabled, pre-buffer the raw Parquet data instead of issuing one
         read per column chunk. This can improve performance on high-latency
         filesystems.
+    thrift_string_size_limit : int, default None
+        If not None, override the maximum total string size allocated
+        when decoding Thrift structures. The default limit should be
+        sufficient for most Parquet files.
+    thrift_container_size_limit : int, default None
+        If not None, override the maximum total size of containers allocated
+        when decoding Thrift structures. The default limit should be
+        sufficient for most Parquet files.
     """
 
     cdef:
@@ -606,14 +614,20 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
     # Avoid mistakingly creating attributes
     __slots__ = ()
 
-    def __init__(self, bint use_buffered_stream=False,
+    def __init__(self, *, bint use_buffered_stream=False,
                  buffer_size=8192,
-                 bint pre_buffer=False):
+                 bint pre_buffer=False,
+                 thrift_string_size_limit=None,
+                 thrift_container_size_limit=None):
         self.init(shared_ptr[CFragmentScanOptions](
             new CParquetFragmentScanOptions()))
         self.use_buffered_stream = use_buffered_stream
         self.buffer_size = buffer_size
         self.pre_buffer = pre_buffer
+        if thrift_string_size_limit is not None:
+            self.thrift_string_size_limit = thrift_string_size_limit
+        if thrift_container_size_limit is not None:
+            self.thrift_container_size_limit = thrift_container_size_limit
 
     cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
         FragmentScanOptions.init(self, sp)
@@ -654,17 +668,49 @@ cdef class ParquetFragmentScanOptions(FragmentScanOptions):
     def pre_buffer(self, bint pre_buffer):
         self.arrow_reader_properties().set_pre_buffer(pre_buffer)
 
+    @property
+    def thrift_string_size_limit(self):
+        return self.reader_properties().thrift_string_size_limit()
+
+    @thrift_string_size_limit.setter
+    def thrift_string_size_limit(self, size):
+        if size <= 0:
+            raise ValueError("size must be larger than zero")
+        self.reader_properties().set_thrift_string_size_limit(size)
+
+    @property
+    def thrift_container_size_limit(self):
+        return self.reader_properties().thrift_container_size_limit()
+
+    @thrift_container_size_limit.setter
+    def thrift_container_size_limit(self, size):
+        if size <= 0:
+            raise ValueError("size must be larger than zero")
+        self.reader_properties().set_thrift_container_size_limit(size)
+
     def equals(self, ParquetFragmentScanOptions other):
-        return (
-            self.use_buffered_stream == other.use_buffered_stream and
-            self.buffer_size == other.buffer_size and
-            self.pre_buffer == other.pre_buffer
-        )
+        attrs = (
+            self.use_buffered_stream, self.buffer_size, self.pre_buffer,
+            self.thrift_string_size_limit, self.thrift_container_size_limit)
+        other_attrs = (
+            other.use_buffered_stream, other.buffer_size, other.pre_buffer,
+            other.thrift_string_size_limit,
+            other.thrift_container_size_limit)
+        return attrs == other_attrs
+
+    @classmethod
+    def _reconstruct(cls, kwargs):
+        return cls(**kwargs)
 
     def __reduce__(self):
-        return ParquetFragmentScanOptions, (
-            self.use_buffered_stream, self.buffer_size, self.pre_buffer
+        kwargs = dict(
+            use_buffered_stream=self.use_buffered_stream,
+            buffer_size=self.buffer_size,
+            pre_buffer=self.pre_buffer,
+            thrift_string_size_limit=self.thrift_string_size_limit,
+            thrift_container_size_limit=self.thrift_container_size_limit,
         )
+        return type(self)._reconstruct, (kwargs,)
 
 
 cdef class ParquetFactoryOptions(_Weakrefable):
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 98857d5b48..29b625df50 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -359,8 +359,16 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
         c_bool is_buffered_stream_enabled() const
         void enable_buffered_stream()
         void disable_buffered_stream()
+
         void set_buffer_size(int64_t buf_size)
         int64_t buffer_size() const
+
+        void set_thrift_string_size_limit(int32_t size)
+        int32_t thrift_string_size_limit() const
+
+        void set_thrift_container_size_limit(int32_t size)
+        int32_t thrift_container_size_limit() const
+
         void file_decryption_properties(shared_ptr[CFileDecryptionProperties]
                                         decryption)
         shared_ptr[CFileDecryptionProperties] file_decryption_properties() \
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index 8812ab1059..45f89e8b91 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1167,11 +1167,13 @@ cdef class ParquetReader(_Weakrefable):
         self.pool = maybe_unbox_memory_pool(memory_pool)
         self._metadata = None
 
-    def open(self, object source not None, bint use_memory_map=True,
+    def open(self, object source not None, *, bint use_memory_map=True,
              read_dictionary=None, FileMetaData metadata=None,
              int buffer_size=0, bint pre_buffer=False,
              coerce_int96_timestamp_unit=None,
-             FileDecryptionProperties decryption_properties=None):
+             FileDecryptionProperties decryption_properties=None,
+             thrift_string_size_limit=None,
+             thrift_container_size_limit=None):
         cdef:
             shared_ptr[CRandomAccessFile] rd_handle
             shared_ptr[CFileMetaData] c_metadata
@@ -1193,6 +1195,18 @@ cdef class ParquetReader(_Weakrefable):
         else:
             raise ValueError('Buffer size must be larger than zero')
 
+        if thrift_string_size_limit is not None:
+            if thrift_string_size_limit <= 0:
+                raise ValueError("thrift_string_size_limit "
+                                 "must be larger than zero")
+            properties.set_thrift_string_size_limit(thrift_string_size_limit)
+        if thrift_container_size_limit is not None:
+            if thrift_container_size_limit <= 0:
+                raise ValueError("thrift_container_size_limit "
+                                 "must be larger than zero")
+            properties.set_thrift_container_size_limit(
+                thrift_container_size_limit)
+
         if decryption_properties is not None:
             properties.file_decryption_properties(
                 decryption_properties.unwrap())
diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py
index 7d1c8a4308..cbdf51b238 100644
--- a/python/pyarrow/parquet/__init__.py
+++ b/python/pyarrow/parquet/__init__.py
@@ -226,6 +226,14 @@ class ParquetFile:
         in nanoseconds.
     decryption_properties : FileDecryptionProperties, default None
         File decryption properties for Parquet Modular Encryption.
+    thrift_string_size_limit : int, default None
+        If not None, override the maximum total string size allocated
+        when decoding Thrift structures. The default limit should be
+        sufficient for most Parquet files.
+    thrift_container_size_limit : int, default None
+        If not None, override the maximum total size of containers allocated
+        when decoding Thrift structures. The default limit should be
+        sufficient for most Parquet files.
 
     Examples
     --------
@@ -269,17 +277,20 @@ class ParquetFile:
     [0,1,2,3,4,5]]
     """
 
-    def __init__(self, source, metadata=None, common_metadata=None,
+    def __init__(self, source, *, metadata=None, common_metadata=None,
                  read_dictionary=None, memory_map=False, buffer_size=0,
                  pre_buffer=False, coerce_int96_timestamp_unit=None,
-                 decryption_properties=None):
+                 decryption_properties=None, thrift_string_size_limit=None,
+                 thrift_container_size_limit=None):
         self.reader = ParquetReader()
         self.reader.open(
             source, use_memory_map=memory_map,
             buffer_size=buffer_size, pre_buffer=pre_buffer,
             read_dictionary=read_dictionary, metadata=metadata,
             coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
-            decryption_properties=decryption_properties
+            decryption_properties=decryption_properties,
+            thrift_string_size_limit=thrift_string_size_limit,
+            thrift_container_size_limit=thrift_container_size_limit,
         )
         self.common_metadata = common_metadata
         self._nested_paths_by_prefix = self._build_nested_paths()
@@ -1650,7 +1661,9 @@ Examples
                 filters=None, metadata_nthreads=None, read_dictionary=None,
                 memory_map=False, buffer_size=0, partitioning="hive",
                 use_legacy_dataset=None, pre_buffer=True,
-                coerce_int96_timestamp_unit=None):
+                coerce_int96_timestamp_unit=None,
+                thrift_string_size_limit=None,
+                thrift_container_size_limit=None):
         if use_legacy_dataset is None:
             # if a new filesystem is passed -> default to new implementation
             if isinstance(filesystem, FileSystem):
@@ -1673,7 +1686,9 @@ Examples
                 schema=schema, metadata=metadata,
                 split_row_groups=split_row_groups,
                 validate_schema=validate_schema,
-                metadata_nthreads=metadata_nthreads
+                metadata_nthreads=metadata_nthreads,
+                thrift_string_size_limit=thrift_string_size_limit,
+                thrift_container_size_limit=thrift_container_size_limit,
             )
         self = object.__new__(cls)
         return self
@@ -1683,7 +1698,9 @@ Examples
                  filters=None, metadata_nthreads=None, read_dictionary=None,
                  memory_map=False, buffer_size=0, partitioning="hive",
                  use_legacy_dataset=True, pre_buffer=True,
-                 coerce_int96_timestamp_unit=None):
+                 coerce_int96_timestamp_unit=None,
+                 thrift_string_size_limit=None,
+                 thrift_container_size_limit=None):
         if partitioning != "hive":
             raise ValueError(
                 'Only "hive" for hive-like partitioning is supported when '
@@ -2258,11 +2275,13 @@ class _ParquetDatasetV2:
     1       4  Horse  2022
     """
 
-    def __init__(self, path_or_paths, filesystem=None, filters=None,
+    def __init__(self, path_or_paths, filesystem=None, *, filters=None,
                  partitioning="hive", read_dictionary=None, buffer_size=None,
                  memory_map=False, ignore_prefixes=None, pre_buffer=True,
                  coerce_int96_timestamp_unit=None, schema=None,
-                 decryption_properties=None, **kwargs):
+                 decryption_properties=None, thrift_string_size_limit=None,
+                 thrift_container_size_limit=None,
+                 **kwargs):
         import pyarrow.dataset as ds
 
         # Raise error for not supported keywords
@@ -2277,7 +2296,9 @@ class _ParquetDatasetV2:
         # map format arguments
         read_options = {
             "pre_buffer": pre_buffer,
-            "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit
+            "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
+            "thrift_string_size_limit": thrift_string_size_limit,
+            "thrift_container_size_limit": thrift_container_size_limit,
         }
         if buffer_size:
             read_options.update(use_buffered_stream=True,
@@ -2636,6 +2657,15 @@ decryption_properties : FileDecryptionProperties or None
     File-level decryption properties.
     The decryption properties can be created using
     ``CryptoFactory.file_decryption_properties()``.
+thrift_string_size_limit : int, default None
+    If not None, override the maximum total string size allocated
+    when decoding Thrift structures. The default limit should be
+    sufficient for most Parquet files.
+thrift_container_size_limit : int, default None
+    If not None, override the maximum total size of containers allocated
+    when decoding Thrift structures. The default limit should be
+    sufficient for most Parquet files.
+
 Returns
 -------
 {2}
@@ -2722,13 +2752,14 @@ Read data from a single Parquet file:
 """
 
 
-def read_table(source, columns=None, use_threads=True, metadata=None,
+def read_table(source, *, columns=None, use_threads=True, metadata=None,
                schema=None, use_pandas_metadata=False, memory_map=False,
                read_dictionary=None, filesystem=None, filters=None,
                buffer_size=0, partitioning="hive", use_legacy_dataset=False,
                ignore_prefixes=None, pre_buffer=True,
                coerce_int96_timestamp_unit=None,
-               decryption_properties=None):
+               decryption_properties=None, thrift_string_size_limit=None,
+               thrift_container_size_limit=None):
     if not use_legacy_dataset:
         if metadata is not None:
             raise ValueError(
@@ -2749,7 +2780,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
                 filters=filters,
                 ignore_prefixes=ignore_prefixes,
                 pre_buffer=pre_buffer,
-                coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
+                coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
+                thrift_string_size_limit=thrift_string_size_limit,
+                thrift_container_size_limit=thrift_container_size_limit,
             )
         except ImportError:
             # fall back on ParquetFile for simple cases when pyarrow.dataset
@@ -2778,7 +2811,9 @@ def read_table(source, columns=None, use_threads=True, metadata=None,
                 memory_map=memory_map, buffer_size=buffer_size,
                 pre_buffer=pre_buffer,
                 coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
-                decryption_properties=decryption_properties
+                decryption_properties=decryption_properties,
+                thrift_string_size_limit=thrift_string_size_limit,
+                thrift_container_size_limit=thrift_container_size_limit,
             )
 
         return dataset.read(columns=columns, use_threads=use_threads,
diff --git a/python/pyarrow/tests/parquet/test_basic.py b/python/pyarrow/tests/parquet/test_basic.py
index 9d31bdeb60..62ea19d422 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -802,3 +802,30 @@ def test_read_table_legacy_deprecated(tempdir):
         FutureWarning, match="Passing 'use_legacy_dataset=True'"
     ):
         pq.read_table(path, use_legacy_dataset=True)
+
+
+def test_thrift_size_limits(tempdir):
+    path = tempdir / 'largethrift.parquet'
+
+    array = pa.array(list(range(10)))
+    num_cols = 1000
+    table = pa.table(
+        [array] * num_cols,
+        names=[f'some_long_column_name_{i}' for i in range(num_cols)])
+    pq.write_table(table, path)
+
+    with pytest.raises(
+            OSError,
+            match="Couldn't deserialize thrift:.*Exceeded size limit"):
+        pq.read_table(path, thrift_string_size_limit=50 * num_cols)
+    with pytest.raises(
+            OSError,
+            match="Couldn't deserialize thrift:.*Exceeded size limit"):
+        pq.read_table(path, thrift_container_size_limit=num_cols)
+
+    got = pq.read_table(path, thrift_string_size_limit=100 * num_cols)
+    assert got == table
+    got = pq.read_table(path, thrift_container_size_limit=2 * num_cols)
+    assert got == table
+    got = pq.read_table(path)
+    assert got == table
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index d2210c4b6c..81fa7d1245 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -748,10 +748,15 @@ def test_parquet_scan_options():
     opts3 = ds.ParquetFragmentScanOptions(
         buffer_size=2**13, use_buffered_stream=True)
     opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True)
+    opts5 = ds.ParquetFragmentScanOptions(
+        thrift_string_size_limit=123456,
+        thrift_container_size_limit=987654,)
 
     assert opts1.use_buffered_stream is False
     assert opts1.buffer_size == 2**13
     assert opts1.pre_buffer is False
+    assert opts1.thrift_string_size_limit == 100_000_000  # default in C++
+    assert opts1.thrift_container_size_limit == 1_000_000  # default in C++
 
     assert opts2.use_buffered_stream is False
     assert opts2.buffer_size == 2**12
@@ -765,10 +770,14 @@ def test_parquet_scan_options():
     assert opts4.buffer_size == 2**13
     assert opts4.pre_buffer is True
 
+    assert opts5.thrift_string_size_limit == 123456
+    assert opts5.thrift_container_size_limit == 987654
+
     assert opts1 == opts1
     assert opts1 != opts2
     assert opts2 != opts3
     assert opts3 != opts4
+    assert opts5 != opts1
 
 
 def test_file_format_pickling():
@@ -795,6 +804,8 @@ def test_file_format_pickling():
             ds.ParquetFileFormat(
                 use_buffered_stream=True,
                 buffer_size=4096,
+                thrift_string_size_limit=123,
+                thrift_container_size_limit=456,
             ),
         ])