You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/05/05 12:31:39 UTC

[arrow] branch master updated: ARROW-14114: [C++][Parquet] Fix multi-threaded read of PME files

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 88bccab18a ARROW-14114: [C++][Parquet] Fix multi-threaded read of PME files
88bccab18a is described below

commit 88bccab18a4ab818355209e45862cc52f9cf4a0d
Author: Maya Anderson <ma...@il.ibm.com>
AuthorDate: Thu May 5 14:31:32 2022 +0200

    ARROW-14114: [C++][Parquet] Fix multi-threaded read of PME files
    
    Change AesDecryptor to be per Decryptor, instead of shared.
    This solves the problem of reading with PME using multiple threads.
    
    Details:
    It was discovered when exposing high-level PME in PyArrow that reading an encrypted parquet file in PyArrow intermittently fails decryption finalization and sometime fails with Segmentation fault. The same in C++ reading an encrypted parquet with FileReader.ReadTable() multithreaded (with set_use_threads(true) ).
    The current implementation uses two caches: meta_decryptor_ and data_decryptor_ , for AesDecryptors, and every Decryptor gets the same AesDecryptor with AesDecryptorImpl from this cache.
    However, AesDecryptor::AesDecryptorImpl::GcmDecrypt() and AesDecryptor::AesDecryptorImpl::CtrDecrypt() use ctx_ member of type EVP_CIPHER_CTX from OpenSSL, which shouldn't be used from multiple threads concurrently.
    So, instead of sharing the same AesDecryptor between all Decryptors, an AesDecryptor will be created per Decryptor, which is per column.
    
    Co-authored-by: Gidon Gershinsky <gg...@apple.com>
    
    CC @thamht4190 @pitrou @revit13
    
    Closes #12778 from andersonm-ibm/multithreaded_read
    
    Lead-authored-by: Maya Anderson <ma...@il.ibm.com>
    Co-authored-by: Gidon Gershinsky <gg...@apple.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/parquet/encryption/encryption_internal.cc  |  7 +--
 cpp/src/parquet/encryption/encryption_internal.h   | 13 +++++-
 .../encryption/encryption_internal_nossl.cc        |  5 +-
 .../parquet/encryption/internal_file_decryptor.cc  | 54 +++++++---------------
 .../parquet/encryption/internal_file_decryptor.h   | 18 ++------
 python/pyarrow/tests/parquet/test_encryption.py    |  4 +-
 6 files changed, 40 insertions(+), 61 deletions(-)

diff --git a/cpp/src/parquet/encryption/encryption_internal.cc b/cpp/src/parquet/encryption/encryption_internal.cc
index 46b5e9b52b..fee3434d09 100644
--- a/cpp/src/parquet/encryption/encryption_internal.cc
+++ b/cpp/src/parquet/encryption/encryption_internal.cc
@@ -427,15 +427,16 @@ AesDecryptor::AesDecryptor(ParquetCipher::type alg_id, int key_len, bool metadat
     : impl_{std::unique_ptr<AesDecryptorImpl>(
           new AesDecryptorImpl(alg_id, key_len, metadata, contains_length))} {}
 
-AesDecryptor* AesDecryptor::Make(ParquetCipher::type alg_id, int key_len, bool metadata,
-                                 std::vector<AesDecryptor*>* all_decryptors) {
+std::shared_ptr<AesDecryptor> AesDecryptor::Make(
+    ParquetCipher::type alg_id, int key_len, bool metadata,
+    std::vector<std::weak_ptr<AesDecryptor>>* all_decryptors) {
   if (ParquetCipher::AES_GCM_V1 != alg_id && ParquetCipher::AES_GCM_CTR_V1 != alg_id) {
     std::stringstream ss;
     ss << "Crypto algorithm " << alg_id << " is not supported";
     throw ParquetException(ss.str());
   }
 
-  AesDecryptor* decryptor = new AesDecryptor(alg_id, key_len, metadata);
+  auto decryptor = std::make_shared<AesDecryptor>(alg_id, key_len, metadata);
   if (all_decryptors != nullptr) {
     all_decryptors->push_back(decryptor);
   }
diff --git a/cpp/src/parquet/encryption/encryption_internal.h b/cpp/src/parquet/encryption/encryption_internal.h
index f18afbbf00..ede338182f 100644
--- a/cpp/src/parquet/encryption/encryption_internal.h
+++ b/cpp/src/parquet/encryption/encryption_internal.h
@@ -88,8 +88,17 @@ class AesDecryptor {
   explicit AesDecryptor(ParquetCipher::type alg_id, int key_len, bool metadata,
                         bool contains_length = true);
 
-  static AesDecryptor* Make(ParquetCipher::type alg_id, int key_len, bool metadata,
-                            std::vector<AesDecryptor*>* all_decryptors);
+  /// \brief Factory function to create an AesDecryptor
+  ///
+  /// \param alg_id the encryption algorithm to use
+  /// \param key_len key length. Possible values: 16, 24, 32 bytes.
+  /// \param metadata if true then this is a metadata decryptor
+  /// \param all_decryptors A weak reference to all decryptors that need to be wiped
+  /// out when decryption is finished
+  /// \return shared pointer to a new AesDecryptor
+  static std::shared_ptr<AesDecryptor> Make(
+      ParquetCipher::type alg_id, int key_len, bool metadata,
+      std::vector<std::weak_ptr<AesDecryptor>>* all_decryptors);
 
   ~AesDecryptor();
   void WipeOut();
diff --git a/cpp/src/parquet/encryption/encryption_internal_nossl.cc b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
index 39af711fa2..540ee16f78 100644
--- a/cpp/src/parquet/encryption/encryption_internal_nossl.cc
+++ b/cpp/src/parquet/encryption/encryption_internal_nossl.cc
@@ -86,8 +86,9 @@ AesDecryptor::AesDecryptor(ParquetCipher::type alg_id, int key_len, bool metadat
   ThrowOpenSSLRequiredException();
 }
 
-AesDecryptor* AesDecryptor::Make(ParquetCipher::type alg_id, int key_len, bool metadata,
-                                 std::vector<AesDecryptor*>* all_decryptors) {
+std::shared_ptr<AesDecryptor> AesDecryptor::Make(
+    ParquetCipher::type alg_id, int key_len, bool metadata,
+    std::vector<std::weak_ptr<AesDecryptor>>* all_decryptors) {
   return NULLPTR;
 }
 
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.cc b/cpp/src/parquet/encryption/internal_file_decryptor.cc
index 6381e4f37f..87bfc2bd12 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.cc
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.cc
@@ -22,9 +22,9 @@
 namespace parquet {
 
 // Decryptor
-Decryptor::Decryptor(encryption::AesDecryptor* aes_decryptor, const std::string& key,
-                     const std::string& file_aad, const std::string& aad,
-                     ::arrow::MemoryPool* pool)
+Decryptor::Decryptor(std::shared_ptr<encryption::AesDecryptor> aes_decryptor,
+                     const std::string& key, const std::string& file_aad,
+                     const std::string& aad, ::arrow::MemoryPool* pool)
     : aes_decryptor_(aes_decryptor),
       key_(key),
       file_aad_(file_aad),
@@ -61,7 +61,9 @@ InternalFileDecryptor::InternalFileDecryptor(FileDecryptionProperties* propertie
 void InternalFileDecryptor::WipeOutDecryptionKeys() {
   properties_->WipeOutDecryptionKeys();
   for (auto const& i : all_decryptors_) {
-    i->WipeOut();
+    if (auto aes_decryptor = i.lock()) {
+      aes_decryptor->WipeOut();
+    }
   }
 }
 
@@ -134,8 +136,11 @@ std::shared_ptr<Decryptor> InternalFileDecryptor::GetFooterDecryptor(
 
   // Create both data and metadata decryptors to avoid redundant retrieval of key
   // from the key_retriever.
-  auto aes_metadata_decryptor = GetMetaAesDecryptor(footer_key.size());
-  auto aes_data_decryptor = GetDataAesDecryptor(footer_key.size());
+  int key_len = static_cast<int>(footer_key.size());
+  auto aes_metadata_decryptor = encryption::AesDecryptor::Make(
+      algorithm_, key_len, /*metadata=*/true, &all_decryptors_);
+  auto aes_data_decryptor = encryption::AesDecryptor::Make(
+      algorithm_, key_len, /*metadata=*/false, &all_decryptors_);
 
   footer_metadata_decryptor_ = std::make_shared<Decryptor>(
       aes_metadata_decryptor, footer_key, file_aad_, aad, pool_);
@@ -195,8 +200,11 @@ std::shared_ptr<Decryptor> InternalFileDecryptor::GetColumnDecryptor(
 
   // Create both data and metadata decryptors to avoid redundant retrieval of key
   // using the key_retriever.
-  auto aes_metadata_decryptor = GetMetaAesDecryptor(column_key.size());
-  auto aes_data_decryptor = GetDataAesDecryptor(column_key.size());
+  int key_len = static_cast<int>(column_key.size());
+  auto aes_metadata_decryptor = encryption::AesDecryptor::Make(
+      algorithm_, key_len, /*metadata=*/true, &all_decryptors_);
+  auto aes_data_decryptor = encryption::AesDecryptor::Make(
+      algorithm_, key_len, /*metadata=*/false, &all_decryptors_);
 
   column_metadata_map_[column_path] = std::make_shared<Decryptor>(
       aes_metadata_decryptor, column_key, file_aad_, aad, pool_);
@@ -207,34 +215,4 @@ std::shared_ptr<Decryptor> InternalFileDecryptor::GetColumnDecryptor(
   return column_data_map_[column_path];
 }
 
-int InternalFileDecryptor::MapKeyLenToDecryptorArrayIndex(int key_len) {
-  if (key_len == 16)
-    return 0;
-  else if (key_len == 24)
-    return 1;
-  else if (key_len == 32)
-    return 2;
-  throw ParquetException("decryption key must be 16, 24 or 32 bytes in length");
-}
-
-encryption::AesDecryptor* InternalFileDecryptor::GetMetaAesDecryptor(size_t key_size) {
-  int key_len = static_cast<int>(key_size);
-  int index = MapKeyLenToDecryptorArrayIndex(key_len);
-  if (meta_decryptor_[index] == nullptr) {
-    meta_decryptor_[index].reset(
-        encryption::AesDecryptor::Make(algorithm_, key_len, true, &all_decryptors_));
-  }
-  return meta_decryptor_[index].get();
-}
-
-encryption::AesDecryptor* InternalFileDecryptor::GetDataAesDecryptor(size_t key_size) {
-  int key_len = static_cast<int>(key_size);
-  int index = MapKeyLenToDecryptorArrayIndex(key_len);
-  if (data_decryptor_[index] == nullptr) {
-    data_decryptor_[index].reset(
-        encryption::AesDecryptor::Make(algorithm_, key_len, false, &all_decryptors_));
-  }
-  return data_decryptor_[index].get();
-}
-
 }  // namespace parquet
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.h b/cpp/src/parquet/encryption/internal_file_decryptor.h
index 011c4acbeb..2f9c3952af 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.h
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.h
@@ -35,7 +35,7 @@ class FileDecryptionProperties;
 
 class PARQUET_EXPORT Decryptor {
  public:
-  Decryptor(encryption::AesDecryptor* decryptor, const std::string& key,
+  Decryptor(std::shared_ptr<encryption::AesDecryptor> decryptor, const std::string& key,
             const std::string& file_aad, const std::string& aad,
             ::arrow::MemoryPool* pool);
 
@@ -47,7 +47,7 @@ class PARQUET_EXPORT Decryptor {
   int Decrypt(const uint8_t* ciphertext, int ciphertext_len, uint8_t* plaintext);
 
  private:
-  encryption::AesDecryptor* aes_decryptor_;
+  std::shared_ptr<encryption::AesDecryptor> aes_decryptor_;
   std::string key_;
   std::string file_aad_;
   std::string aad_;
@@ -97,12 +97,9 @@ class InternalFileDecryptor {
   std::shared_ptr<Decryptor> footer_data_decryptor_;
   ParquetCipher::type algorithm_;
   std::string footer_key_metadata_;
-  std::vector<encryption::AesDecryptor*> all_decryptors_;
-
-  /// Key must be 16, 24 or 32 bytes in length. Thus there could be up to three
-  // types of meta_decryptors and data_decryptors.
-  std::unique_ptr<encryption::AesDecryptor> meta_decryptor_[3];
-  std::unique_ptr<encryption::AesDecryptor> data_decryptor_[3];
+  // A weak reference to all decryptors that need to be wiped out when decryption is
+  // finished
+  std::vector<std::weak_ptr<encryption::AesDecryptor>> all_decryptors_;
 
   ::arrow::MemoryPool* pool_;
 
@@ -111,11 +108,6 @@ class InternalFileDecryptor {
                                                 const std::string& column_key_metadata,
                                                 const std::string& aad,
                                                 bool metadata = false);
-
-  encryption::AesDecryptor* GetMetaAesDecryptor(size_t key_size);
-  encryption::AesDecryptor* GetDataAesDecryptor(size_t key_size);
-
-  int MapKeyLenToDecryptorArrayIndex(int key_len);
 };
 
 }  // namespace parquet
diff --git a/python/pyarrow/tests/parquet/test_encryption.py b/python/pyarrow/tests/parquet/test_encryption.py
index a862ba3454..fe28b1e0f4 100644
--- a/python/pyarrow/tests/parquet/test_encryption.py
+++ b/python/pyarrow/tests/parquet/test_encryption.py
@@ -126,7 +126,7 @@ def read_encrypted_parquet(path, decryption_config,
 
     result = pq.ParquetFile(
         path, decryption_properties=file_decryption_properties)
-    return result.read(use_threads=False)
+    return result.read(use_threads=True)
 
 
 def test_encrypted_parquet_write_read_wrong_key(tempdir, data_table):
@@ -486,8 +486,6 @@ def test_encrypted_parquet_write_external(tempdir, data_table):
                             kms_connection_config, crypto_factory)
 
 
-@pytest.mark.skip(reason="ARROW-14114: Multithreaded read sometimes fails"
-                  "decryption finalization or with Segmentation fault")
 def test_encrypted_parquet_loop(tempdir, data_table, basic_encryption_config):
     """Write an encrypted parquet, verify it's encrypted,
     and then read it multithreaded in a loop."""