You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/09/14 04:33:00 UTC

[jira] [Commented] (PARQUET-1300) [C++] Parquet modular encryption

    [ https://issues.apache.org/jira/browse/PARQUET-1300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16614351#comment-16614351 ] 

ASF GitHub Bot commented on PARQUET-1300:
-----------------------------------------

wesm closed pull request #475: [PARQUET-1300] [C++] parquet encryption modular
URL: https://github.com/apache/parquet-cpp/pull/475
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 698f6d76..44e4eb75 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -537,6 +537,15 @@ else()
   endif()
 endif()
 
+#############################################################
+# OpenSSL linkage
+find_package(OpenSSL REQUIRED)
+message(STATUS "OPENSSL_FOUND ${OPENSSL_FOUND}")
+if(OPENSSL_FOUND)
+  set(OPENSSL_USE_STATIC_LIBS TRUE)
+  include_directories(${OPENSSL_INCLUDE_DIR})
+endif()
+
 #############################################################
 # Apache Arrow linkage
 
@@ -691,6 +700,8 @@ set(LIBPARQUET_SRCS
   src/parquet/types.cc
   src/parquet/util/comparison.cc
   src/parquet/util/memory.cc
+  src/parquet/util/crypto.cc
+  src/parquet/encryption.cc
 )
 
 # # Ensure that thrift compilation is done before using its generated headers
@@ -712,6 +723,10 @@ if (NOT PARQUET_MINIMAL_DEPENDENCY)
 # Although we don't link parquet_objlib against anything, we need it to depend
 # on these libs as we may generate their headers via ExternalProject_Add
   set(PARQUET_DEPENDENCIES ${PARQUET_DEPENDENCIES} ${LIBPARQUET_INTERFACE_LINK_LIBS})
+  set(LIBPARQUET_INTERFACE_LINK_LIBS
+    ${OPENSSL_CRYPTO_LIBRARY}
+    ${LIBPARQUET_INTERFACE_LINK_LIBS}
+  )
 endif()
 
 if(NOT APPLE AND NOT MSVC)
diff --git a/examples/low-level-api/CMakeLists.txt b/examples/low-level-api/CMakeLists.txt
index 64ba110e..10082ad2 100644
--- a/examples/low-level-api/CMakeLists.txt
+++ b/examples/low-level-api/CMakeLists.txt
@@ -18,8 +18,12 @@
 if (PARQUET_BUILD_EXECUTABLES)
   add_executable(reader-writer reader-writer.cc)
   add_executable(reader-writer2 reader-writer2.cc)
+  add_executable(encryption-reader-writer encryption-reader-writer.cc)
   target_include_directories(reader-writer PRIVATE .)
   target_include_directories(reader-writer2 PRIVATE .)
+  target_include_directories(encryption-reader-writer PRIVATE .)
   target_link_libraries(reader-writer parquet_static)
   target_link_libraries(reader-writer2 parquet_static)
+
+  target_link_libraries(encryption-reader-writer parquet_static)
 endif()
diff --git a/examples/low-level-api/encryption-reader-writer.cc b/examples/low-level-api/encryption-reader-writer.cc
new file mode 100644
index 00000000..27dd9ab9
--- /dev/null
+++ b/examples/low-level-api/encryption-reader-writer.cc
@@ -0,0 +1,436 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cassert>
+#include <fstream>
+#include <iostream>
+#include <memory>
+
+#include <reader_writer.h>
+
+/*
+ * This example describes writing and reading Parquet Files in C++ and serves as a
+ * reference to the API.
+ * The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups optimized for
+ *memory consumption
+ **/
+
+/* Parquet is a structured columnar file format
+ * Parquet File = "Parquet data" + "Parquet Metadata"
+ * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of rows in a
+ * columnar layout
+ * "Parquet Metadata" contains the "file schema" and attributes of the RowGroups and their
+ * Columns
+ * "file schema" is a tree where each node is either a primitive type (leaf nodes) or a
+ * complex (nested) type (internal nodes)
+ * For specific details, please refer the format here:
+ * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ **/
+
+constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
+const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet.encrypted";
+const std::string encryptionKey = "0123456789012345"; // 16 bytes
+const std::string encryptionKeyCol = "1234567890123450"; // 16 bytes
+
+int main(int argc, char** argv) {
+  /**********************************************************************************
+                             PARQUET WRITER EXAMPLE
+  **********************************************************************************/
+  // parquet::REQUIRED fields do not need definition and repetition level values
+  // parquet::OPTIONAL fields require only definition level values
+  // parquet::REPEATED fields require both definition and repetition level values
+  try {
+    // Create a local file output stream instance.
+    using FileClass = ::arrow::io::FileOutputStream;
+    std::shared_ptr<FileClass> out_file;
+    PARQUET_THROW_NOT_OK(FileClass::Open(PARQUET_FILENAME, &out_file));
+
+    // Setup the parquet schema
+    std::shared_ptr<GroupNode> schema = SetupSchema();
+
+    // Add writer properties
+    parquet::WriterProperties::Builder builder;
+    builder.compression(parquet::Compression::SNAPPY);
+    
+    // uniform encryption
+    parquet::FileEncryptionProperties::Builder file_encryption_builder;
+    file_encryption_builder.footer_key(encryptionKey);
+
+    // non-uniform with column keys
+    std::map<std::string, std::shared_ptr<parquet::ColumnEncryptionProperties>> encryption_cols;
+    parquet::ColumnEncryptionProperties::Builder encryption_col_builder0("column_0", true);
+    encryption_col_builder0.key(encryptionKeyCol);
+    auto encryption_col0 = encryption_col_builder0.build();
+    
+    encryption_cols[encryption_col0->path()] = encryption_col0;
+
+    file_encryption_builder.column_properties(encryption_cols, true);
+
+    builder.encryption(file_encryption_builder.build());
+    
+    std::shared_ptr<parquet::WriterProperties> props = builder.build();
+
+    // Create a ParquetFileWriter instance
+    std::shared_ptr<parquet::ParquetFileWriter> file_writer =
+        parquet::ParquetFileWriter::Open(out_file, schema, props);
+
+    // Append a RowGroup with a specific number of rows.
+    parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+    // Write the Bool column
+    parquet::BoolWriter* bool_writer =
+        static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      bool value = ((i % 2) == 0) ? true : false;
+      bool_writer->WriteBatch(1, nullptr, nullptr, &value);
+    }
+
+    // Write the Int32 column
+    parquet::Int32Writer* int32_writer =
+        static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      int32_t value = i;
+      int32_writer->WriteBatch(1, nullptr, nullptr, &value);
+    }
+
+    // Write the Int64 column. Each row has repeats twice.
+    parquet::Int64Writer* int64_writer =
+        static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
+    for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
+      int64_t value = i * 1000 * 1000;
+      value *= 1000 * 1000;
+      int16_t definition_level = 1;
+      int16_t repetition_level = 0;
+      if ((i % 2) == 0) {
+        repetition_level = 1;  // start of a new record
+      }
+      int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
+    }
+
+    // Write the INT96 column.
+    parquet::Int96Writer* int96_writer =
+        static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      parquet::Int96 value;
+      value.value[0] = i;
+      value.value[1] = i + 1;
+      value.value[2] = i + 2;
+      int96_writer->WriteBatch(1, nullptr, nullptr, &value);
+    }
+
+    // Write the Float column
+    parquet::FloatWriter* float_writer =
+        static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      float value = static_cast<float>(i) * 1.1f;
+      float_writer->WriteBatch(1, nullptr, nullptr, &value);
+    }
+
+    // Write the Double column
+    parquet::DoubleWriter* double_writer =
+        static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      double value = i * 1.1111111;
+      double_writer->WriteBatch(1, nullptr, nullptr, &value);
+    }
+
+    // Write the ByteArray column. Make every alternate values NULL
+    parquet::ByteArrayWriter* ba_writer =
+        static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      parquet::ByteArray value;
+      char hello[FIXED_LENGTH] = "parquet";
+      hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
+      hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
+      hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
+      if (i % 2 == 0) {
+        int16_t definition_level = 1;
+        value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
+        value.len = FIXED_LENGTH;
+        ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
+      } else {
+        int16_t definition_level = 0;
+        ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
+      }
+    }
+
+    // Write the FixedLengthByteArray column
+    parquet::FixedLenByteArrayWriter* flba_writer =
+        static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
+    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
+      parquet::FixedLenByteArray value;
+      char v = static_cast<char>(i);
+      char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
+      value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
+
+      flba_writer->WriteBatch(1, nullptr, nullptr, &value);
+    }
+
+    // Close the ParquetFileWriter
+    file_writer->Close();
+
+    // Write the bytes to file
+    DCHECK(out_file->Close().ok());
+  } catch (const std::exception& e) {
+    std::cerr << "Parquet write error: " << e.what() << std::endl;
+    return -1;
+  }
+
+  /**********************************************************************************
+                             PARQUET READER EXAMPLE
+  **********************************************************************************/
+
+  try {
+    // decryption properties
+    std::shared_ptr<parquet::FileDecryptionProperties> decryption_properties = 
+        std::make_shared<parquet::FileDecryptionProperties>(encryptionKey);
+    decryption_properties->SetColumnKey("column_0", encryptionKeyCol);
+
+    parquet::ReaderProperties reader_properties = parquet::default_reader_properties();
+    reader_properties.file_decryption(decryption_properties);
+
+    // Create a ParquetReader instance
+    std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+        parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false, reader_properties);
+
+    // Get the File MetaData
+    std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
+
+    // Get the number of RowGroups
+    int num_row_groups = file_metadata->num_row_groups();
+    assert(num_row_groups == 1);
+
+    // Get the number of Columns
+    int num_columns = file_metadata->num_columns();
+    assert(num_columns == 8);
+
+    // Iterate over all the RowGroups in the file
+    for (int r = 0; r < num_row_groups; ++r) {
+      // Get the RowGroup Reader
+      std::shared_ptr<parquet::RowGroupReader> row_group_reader =
+          parquet_reader->RowGroup(r);
+
+      int64_t values_read = 0;
+      int64_t rows_read = 0;
+      int16_t definition_level;
+      int16_t repetition_level;
+      int i;
+      std::shared_ptr<parquet::ColumnReader> column_reader;
+
+      // Get the Column Reader for the boolean column
+      column_reader = row_group_reader->Column(0);
+      parquet::BoolReader* bool_reader =
+          static_cast<parquet::BoolReader*>(column_reader.get());
+
+      // Read all the rows in the column
+      i = 0;
+      while (bool_reader->HasNext()) {
+        bool value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        bool expected_value = ((i % 2) == 0) ? true : false;
+        assert(value == expected_value);
+        i++;
+      }
+
+      // Get the Column Reader for the Int32 column
+      column_reader = row_group_reader->Column(1);
+      parquet::Int32Reader* int32_reader =
+          static_cast<parquet::Int32Reader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (int32_reader->HasNext()) {
+        int32_t value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        assert(value == i);
+        i++;
+      }
+
+      // Get the Column Reader for the Int64 column
+      column_reader = row_group_reader->Column(2);
+      parquet::Int64Reader* int64_reader =
+          static_cast<parquet::Int64Reader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (int64_reader->HasNext()) {
+        int64_t value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = int64_reader->ReadBatch(1, &definition_level, &repetition_level,
+                                            &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        int64_t expected_value = i * 1000 * 1000;
+        expected_value *= 1000 * 1000;
+        assert(value == expected_value);
+        if ((i % 2) == 0) {
+          assert(repetition_level == 1);
+        } else {
+          assert(repetition_level == 0);
+        }
+        i++;
+      }
+
+      // Get the Column Reader for the Int96 column
+      column_reader = row_group_reader->Column(3);
+      parquet::Int96Reader* int96_reader =
+          static_cast<parquet::Int96Reader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (int96_reader->HasNext()) {
+        parquet::Int96 value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        parquet::Int96 expected_value;
+        expected_value.value[0] = i;
+        expected_value.value[1] = i + 1;
+        expected_value.value[2] = i + 2;
+        for (int j = 0; j < 3; j++) {
+          assert(value.value[j] == expected_value.value[j]);
+        }
+        i++;
+      }
+
+      // Get the Column Reader for the Float column
+      column_reader = row_group_reader->Column(4);
+      parquet::FloatReader* float_reader =
+          static_cast<parquet::FloatReader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (float_reader->HasNext()) {
+        float value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        float expected_value = static_cast<float>(i) * 1.1f;
+        assert(value == expected_value);
+        i++;
+      }
+
+      // Get the Column Reader for the Double column
+      column_reader = row_group_reader->Column(5);
+      parquet::DoubleReader* double_reader =
+          static_cast<parquet::DoubleReader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (double_reader->HasNext()) {
+        double value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        double expected_value = i * 1.1111111;
+        assert(value == expected_value);
+        i++;
+      }
+
+      // Get the Column Reader for the ByteArray column
+      column_reader = row_group_reader->Column(6);
+      parquet::ByteArrayReader* ba_reader =
+          static_cast<parquet::ByteArrayReader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (ba_reader->HasNext()) {
+        parquet::ByteArray value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read =
+            ba_reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // Verify the value written
+        char expected_value[FIXED_LENGTH] = "parquet";
+        expected_value[7] = static_cast<char>('0' + i / 100);
+        expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
+        expected_value[9] = static_cast<char>('0' + i % 10);
+        if (i % 2 == 0) {  // only alternate values exist
+          // There are no NULL values in the rows written
+          assert(values_read == 1);
+          assert(value.len == FIXED_LENGTH);
+          assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
+          assert(definition_level == 1);
+        } else {
+          // There are NULL values in the rows written
+          assert(values_read == 0);
+          assert(definition_level == 0);
+        }
+        i++;
+      }
+
+      // Get the Column Reader for the FixedLengthByteArray column
+      column_reader = row_group_reader->Column(7);
+      parquet::FixedLenByteArrayReader* flba_reader =
+          static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
+      // Read all the rows in the column
+      i = 0;
+      while (flba_reader->HasNext()) {
+        parquet::FixedLenByteArray value;
+        // Read one value at a time. The number of rows read is returned. values_read
+        // contains the number of non-null rows
+        rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value, &values_read);
+        // Ensure only one value is read
+        assert(rows_read == 1);
+        // There are no NULL values in the rows written
+        assert(values_read == 1);
+        // Verify the value written
+        char v = static_cast<char>(i);
+        char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
+        assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
+        i++;
+      }
+    }
+  } catch (const std::exception& e) {
+    std::cerr << "Parquet read error: " << e.what() << std::endl;
+    return -1;
+  }
+
+  std::cout << "Parquet Writing and Reading Complete" << std::endl;
+
+  return 0;
+}
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index 28d0dcb6..f1da9515 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -33,6 +33,8 @@
 #include "parquet/properties.h"
 #include "parquet/thrift.h"
 
+#include "parquet/util/crypto.h"
+
 using arrow::MemoryPool;
 
 namespace parquet {
@@ -102,11 +104,15 @@ ReaderProperties default_reader_properties() {
 class SerializedPageReader : public PageReader {
  public:
   SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t total_num_rows,
-                       Compression::type codec, ::arrow::MemoryPool* pool)
+                       Compression::type codec,
+                       std::shared_ptr<EncryptionProperties> encryption,
+                       ::arrow::MemoryPool* pool)
       : stream_(std::move(stream)),
         decompression_buffer_(AllocateBuffer(pool, 0)),
         seen_num_rows_(0),
-        total_num_rows_(total_num_rows) {
+        total_num_rows_(total_num_rows),
+        encryption_(encryption),
+        decryption_buffer_(AllocateBuffer(pool, 0)) {
     max_page_header_size_ = kDefaultMaxPageHeaderSize;
     decompressor_ = GetCodecFromArrow(codec);
   }
@@ -134,6 +140,10 @@ class SerializedPageReader : public PageReader {
 
   // Number of rows in all the data pages
   int64_t total_num_rows_;
+
+  // Encryption
+  std::shared_ptr<EncryptionProperties> encryption_;
+  std::shared_ptr<ResizableBuffer> decryption_buffer_;
 };
 
 std::shared_ptr<Page> SerializedPageReader::NextPage() {
@@ -158,7 +168,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       // This gets used, then set by DeserializeThriftMsg
       header_size = static_cast<uint32_t>(bytes_available);
       try {
-        DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
+        DeserializeThriftMsg(buffer, &header_size, &current_page_header_,
+                             encryption_.get());
         break;
       } catch (std::exception& e) {
         // Failed to deserialize. Double the allowed page header size and try again
@@ -186,6 +197,15 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       ParquetException::EofException(ss.str());
     }
 
+    // Decrypt it if we need to
+    if (encryption_ != nullptr) {
+      decryption_buffer_->Resize(encryption_->CalculatePlainSize(compressed_len), false);
+      compressed_len = parquet_encryption::Decrypt(
+          encryption_, false, buffer, compressed_len, decryption_buffer_->mutable_data());
+
+      buffer = decryption_buffer_->data();
+    }
+
     // Uncompress it if we need to
     if (decompressor_ != nullptr) {
       // Grow the uncompressed buffer if we need to.
@@ -254,12 +274,11 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
   return std::shared_ptr<Page>(nullptr);
 }
 
-std::unique_ptr<PageReader> PageReader::Open(std::unique_ptr<InputStream> stream,
-                                             int64_t total_num_rows,
-                                             Compression::type codec,
-                                             ::arrow::MemoryPool* pool) {
-  return std::unique_ptr<PageReader>(
-      new SerializedPageReader(std::move(stream), total_num_rows, codec, pool));
+std::unique_ptr<PageReader> PageReader::Open(
+    std::unique_ptr<InputStream> stream, int64_t total_num_rows, Compression::type codec,
+    std::shared_ptr<EncryptionProperties> encryption, ::arrow::MemoryPool* pool) {
+  return std::unique_ptr<PageReader>(new SerializedPageReader(
+      std::move(stream), total_num_rows, codec, encryption, pool));
 }
 
 // ----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index 71346320..089e4214 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -86,7 +86,7 @@ class PARQUET_EXPORT PageReader {
 
   static std::unique_ptr<PageReader> Open(
       std::unique_ptr<InputStream> stream, int64_t total_num_rows,
-      Compression::type codec,
+      Compression::type codec, std::shared_ptr<EncryptionProperties> encryption = nullptr,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index e87d549b..97475eec 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -91,8 +91,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
 
     metadata_ = ColumnChunkMetaDataBuilder::Make(
         writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
-    std::unique_ptr<PageWriter> pager =
-        PageWriter::Open(sink_.get(), column_properties.compression(), metadata_.get());
+    std::unique_ptr<PageWriter> pager = PageWriter::Open(
+        sink_.get(), column_properties.compression(), nullptr, metadata_.get());
     std::shared_ptr<ColumnWriter> writer =
         ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
     return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index a65bda85..4d7a708d 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -28,6 +28,7 @@
 #include "parquet/properties.h"
 #include "parquet/statistics.h"
 #include "parquet/thrift.h"
+#include "parquet/util/crypto.h"
 #include "parquet/util/logging.h"
 #include "parquet/util/memory.h"
 
@@ -129,6 +130,7 @@ static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics
 class SerializedPageWriter : public PageWriter {
  public:
   SerializedPageWriter(OutputStream* sink, Compression::type codec,
+                       const std::shared_ptr<EncryptionProperties>& encryption,
                        ColumnChunkMetaDataBuilder* metadata,
                        ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : sink_(sink),
@@ -138,7 +140,8 @@ class SerializedPageWriter : public PageWriter {
         dictionary_page_offset_(0),
         data_page_offset_(0),
         total_uncompressed_size_(0),
-        total_compressed_size_(0) {
+        total_compressed_size_(0),
+        encryption_(encryption) {
     compressor_ = GetCodecFromArrow(codec);
   }
 
@@ -159,10 +162,22 @@ class SerializedPageWriter : public PageWriter {
     dict_page_header.__set_encoding(ToThrift(page.encoding()));
     dict_page_header.__set_is_sorted(page.is_sorted());
 
+    const uint8_t* output_data_buffer = compressed_data->data();
+    int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
+
+    std::shared_ptr<ResizableBuffer> encrypted_data_buffer = AllocateBuffer(pool_, 0);
+    if (encryption_.get()) {
+      encrypted_data_buffer->Resize(encryption_->CalculateCipherSize(output_data_len));
+      output_data_len = parquet_encryption::Encrypt(
+          encryption_, false, compressed_data->data(), output_data_len,
+          encrypted_data_buffer->mutable_data());
+      output_data_buffer = encrypted_data_buffer->data();
+    }
+
     format::PageHeader page_header;
     page_header.__set_type(format::PageType::DICTIONARY_PAGE);
     page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
-    page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+    page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
     page_header.__set_dictionary_page_header(dict_page_header);
     // TODO(PARQUET-594) crc checksum
 
@@ -170,12 +185,13 @@ class SerializedPageWriter : public PageWriter {
     if (dictionary_page_offset_ == 0) {
       dictionary_page_offset_ = start_pos;
     }
-    int64_t header_size =
-        SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-    sink_->Write(compressed_data->data(), compressed_data->size());
+    int64_t header_size = SerializeThriftMsg(&page_header, sizeof(format::PageHeader),
+                                             sink_, encryption_.get());
+
+    sink_->Write(output_data_buffer, output_data_len);
 
     total_uncompressed_size_ += uncompressed_size + header_size;
-    total_compressed_size_ += compressed_data->size() + header_size;
+    total_compressed_size_ += output_data_len + header_size;
 
     return sink_->Tell() - start_pos;
   }
@@ -224,10 +240,22 @@ class SerializedPageWriter : public PageWriter {
         ToThrift(page.repetition_level_encoding()));
     data_page_header.__set_statistics(ToThrift(page.statistics()));
 
+    const uint8_t* output_data_buffer = compressed_data->data();
+    int32_t output_data_len = static_cast<int32_t>(compressed_data->size());
+
+    std::shared_ptr<ResizableBuffer> encrypted_data_buffer = AllocateBuffer(pool_, 0);
+    if (encryption_.get()) {
+      encrypted_data_buffer->Resize(encryption_->CalculateCipherSize(output_data_len));
+      output_data_len = parquet_encryption::Encrypt(
+          encryption_, false, compressed_data->data(), output_data_len,
+          encrypted_data_buffer->mutable_data());
+      output_data_buffer = encrypted_data_buffer->data();
+    }
+
     format::PageHeader page_header;
     page_header.__set_type(format::PageType::DATA_PAGE);
     page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
-    page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+    page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len));
     page_header.__set_data_page_header(data_page_header);
     // TODO(PARQUET-594) crc checksum
 
@@ -236,12 +264,13 @@ class SerializedPageWriter : public PageWriter {
       data_page_offset_ = start_pos;
     }
 
-    int64_t header_size =
-        SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-    sink_->Write(compressed_data->data(), compressed_data->size());
+    int64_t header_size = SerializeThriftMsg(&page_header, sizeof(format::PageHeader),
+                                             sink_, encryption_.get());
+
+    sink_->Write(output_data_buffer, output_data_len);
 
     total_uncompressed_size_ += uncompressed_size + header_size;
-    total_compressed_size_ += compressed_data->size() + header_size;
+    total_compressed_size_ += output_data_len + header_size;
     num_values_ += page.num_values();
 
     return sink_->Tell() - start_pos;
@@ -271,18 +300,23 @@ class SerializedPageWriter : public PageWriter {
 
   // Compression codec to use.
   std::unique_ptr<::arrow::Codec> compressor_;
+
+  std::shared_ptr<EncryptionProperties> encryption_;
 };
 
 // This implementation of the PageWriter writes to the final sink on Close .
 class BufferedPageWriter : public PageWriter {
  public:
   BufferedPageWriter(OutputStream* sink, Compression::type codec,
+                     const std::shared_ptr<EncryptionProperties>& encryption,
                      ColumnChunkMetaDataBuilder* metadata,
                      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : final_sink_(sink),
         metadata_(metadata),
         in_memory_sink_(new InMemoryOutputStream(pool)),
-        pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, metadata, pool)) {}
+        pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, encryption,
+                                        metadata, pool)) {
+  }  // TODO: nullptr for EncryptionProperties
 
   int64_t WriteDictionaryPage(const DictionaryPage& page) override {
     return pager_->WriteDictionaryPage(page);
@@ -320,16 +354,17 @@ class BufferedPageWriter : public PageWriter {
   std::unique_ptr<SerializedPageWriter> pager_;
 };
 
-std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec,
-                                             ColumnChunkMetaDataBuilder* metadata,
-                                             ::arrow::MemoryPool* pool,
-                                             bool buffered_row_group) {
+std::unique_ptr<PageWriter> PageWriter::Open(
+    OutputStream* sink, Compression::type codec,
+    const std::shared_ptr<EncryptionProperties>& encryption,
+    ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool,
+    bool buffered_row_group) {
   if (buffered_row_group) {
     return std::unique_ptr<PageWriter>(
-        new BufferedPageWriter(sink, codec, metadata, pool));
+        new BufferedPageWriter(sink, codec, encryption, metadata, pool));
   } else {
     return std::unique_ptr<PageWriter>(
-        new SerializedPageWriter(sink, codec, metadata, pool));
+        new SerializedPageWriter(sink, codec, encryption, metadata, pool));
   }
 }
 
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 1ba428a9..6fe18e07 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -74,7 +74,9 @@ class PageWriter {
   virtual ~PageWriter() {}
 
   static std::unique_ptr<PageWriter> Open(
-      OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
+      OutputStream* sink, Compression::type codec,
+      const std::shared_ptr<EncryptionProperties>& encryption,
+      ColumnChunkMetaDataBuilder* metadata,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
       bool buffered_row_group = false);
 
diff --git a/src/parquet/encryption.cc b/src/parquet/encryption.cc
new file mode 100644
index 00000000..0a2d9ef9
--- /dev/null
+++ b/src/parquet/encryption.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "encryption.h"
+
+#include <string.h>
+
+namespace parquet {
+
+// integer key retriever
+void IntegerKeyIdRetriever::PutKey(uint32_t key_id, const std::string& key) {
+  key_map_.insert(std::make_pair(key_id, key));
+}
+
+const std::string& IntegerKeyIdRetriever::GetKey(const std::string& key_metadata) {
+  uint32_t key_id;
+  memcpy(reinterpret_cast<uint8_t*>(&key_id), key_metadata.c_str(), 4);
+
+  return key_map_[key_id];
+}
+
+// string key retriever
+void StringKeyIdRetriever::PutKey(const std::string& key_id, const std::string& key) {
+  key_map_.insert(std::make_pair(key_id, key));
+}
+
+const std::string& StringKeyIdRetriever::GetKey(const std::string& key_id) {
+  return key_map_[key_id];
+}
+
+}  // namespace parquet
diff --git a/src/parquet/encryption.h b/src/parquet/encryption.h
new file mode 100644
index 00000000..1dbf0d20
--- /dev/null
+++ b/src/parquet/encryption.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ENCRYPTION_H
+#define PARQUET_ENCRYPTION_H
+
+#include <parquet/util/visibility.h>
+#include <map>
+#include <string>
+
+namespace parquet {
+
+class PARQUET_EXPORT DecryptionKeyRetriever {
+ public:
+  virtual const std::string& GetKey(const std::string& key_metadata) = 0;
+  virtual ~DecryptionKeyRetriever() {}
+};
+
+// Simple integer key retriever
+class PARQUET_EXPORT IntegerKeyIdRetriever : public DecryptionKeyRetriever {
+ public:
+  void PutKey(uint32_t key_id, const std::string& key);
+  const std::string& GetKey(const std::string& key_metadata);
+
+ private:
+  std::map<uint32_t, std::string> key_map_;
+};
+
+// Simple string key retriever
+class PARQUET_EXPORT StringKeyIdRetriever : public DecryptionKeyRetriever {
+ public:
+  void PutKey(const std::string& key_id, const std::string& key);
+  const std::string& GetKey(const std::string& key_metadata);
+
+ private:
+  std::map<std::string, std::string> key_map_;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_ENCRYPTION_H
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index c5a0f342..596180c1 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -53,6 +53,7 @@ namespace parquet {
 static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
 static constexpr uint32_t FOOTER_SIZE = 8;
 static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+static constexpr uint8_t PARQUET_EMAGIC[4] = {'P', 'A', 'R', 'E'};
 
 // For PARQUET-816
 static constexpr int64_t kMaxDictHeaderSize = 100;
@@ -89,8 +90,12 @@ const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->met
 class SerializedRowGroup : public RowGroupReader::Contents {
  public:
   SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
-                     int row_group_number, const ReaderProperties& props)
-      : source_(source), file_metadata_(file_metadata), properties_(props) {
+                     FileCryptoMetaData* file_crypto_metadata, int row_group_number,
+                     const ReaderProperties& props)
+      : source_(source),
+        file_metadata_(file_metadata),
+        file_crypto_metadata_(file_crypto_metadata),
+        properties_(props) {
     row_group_metadata_ = file_metadata->RowGroup(row_group_number);
   }
 
@@ -122,14 +127,65 @@ class SerializedRowGroup : public RowGroupReader::Contents {
     }
 
     stream = properties_.GetStream(source_, col_start, col_length);
+    std::unique_ptr<ColumnCryptoMetaData> crypto_meta_data = col->crypto_meta_data();
+
+    bool encrypted = true;
+
+    // file is unencrypted
+    // or file is encrypted but column is unencrypted
+    if (!file_crypto_metadata_ || !crypto_meta_data) {
+      encrypted = false;
+    }
+
+    if (!encrypted) {
+      return PageReader::Open(std::move(stream), col->num_values(), col->compression(),
+                              nullptr, properties_.memory_pool());
+    }
+
+    // the column is encrypted
+
+    auto file_decryption = properties_.file_decryption();
+
+    // the column is encrypted with footer key
+    if (crypto_meta_data->encrypted_with_footer_key()) {
+      std::string footer_key_metadata = file_crypto_metadata_->footer_key_metadata();
+      std::string footer_key = file_decryption->GetFooterKey(footer_key_metadata);
+
+      if (footer_key.empty()) {
+        throw ParquetException("column is encrypted with null footer key");
+      }
+
+      auto footer_encryption = std::make_shared<EncryptionProperties>(
+          file_crypto_metadata_->encryption_algorithm().algorithm, footer_key,
+          file_decryption->GetAad());
+
+      return PageReader::Open(std::move(stream), col->num_values(), col->compression(),
+                              footer_encryption, properties_.memory_pool());
+    }
+
+    // file is non-uniform encrypted and the column is encrypted with its own key
+
+    std::string column_key_metadata = crypto_meta_data->column_key_metadata();
+    // encrypted with column key
+    std::string column_key =
+        file_decryption->GetColumnKey(col->path_in_schema(), column_key_metadata);
+
+    if (column_key.empty()) {
+      throw ParquetException("column is encrypted with null key, path=" +
+                             col->path_in_schema()->ToDotString());
+    }
+    auto column_encryption = std::make_shared<EncryptionProperties>(
+        file_crypto_metadata_->encryption_algorithm().algorithm, column_key,
+        file_decryption->GetAad());
 
     return PageReader::Open(std::move(stream), col->num_values(), col->compression(),
-                            properties_.memory_pool());
+                            column_encryption, properties_.memory_pool());
   }
 
  private:
   RandomAccessSource* source_;
   FileMetaData* file_metadata_;
+  FileCryptoMetaData* file_crypto_metadata_;
   std::unique_ptr<RowGroupMetaData> row_group_metadata_;
   ReaderProperties properties_;
 };
@@ -157,7 +213,8 @@ class SerializedFile : public ParquetFileReader::Contents {
 
   std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
     std::unique_ptr<SerializedRowGroup> contents(
-        new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
+        new SerializedRowGroup(source_.get(), file_metadata_.get(),
+                               file_crypto_metadata_.get(), i, properties_));
     return std::make_shared<RowGroupReader>(std::move(contents));
   }
 
@@ -180,42 +237,104 @@ class SerializedFile : public ParquetFileReader::Contents {
         source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer);
 
     // Check if all bytes are read. Check if last 4 bytes read have the magic bits
-    if (bytes_read != footer_read_size ||
-        memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
-      throw ParquetException("Invalid parquet file. Corrupt footer.");
-    }
+    // no encryption
+    if (bytes_read == footer_read_size &&
+        memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) == 0) {
+      uint32_t metadata_len =
+          *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
+      int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
+      if (FOOTER_SIZE + metadata_len > file_size) {
+        throw ParquetException(
+            "Invalid parquet file. File is less than "
+            "file metadata size.");
+      }
+
+      std::shared_ptr<ResizableBuffer> metadata_buffer =
+          AllocateBuffer(properties_.memory_pool(), metadata_len);
+
+      // Check if the footer_buffer contains the entire metadata
+      if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
+        memcpy(metadata_buffer->mutable_data(),
+               footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE),
+               metadata_len);
+      } else {
+        bytes_read = source_->ReadAt(metadata_start, metadata_len,
+                                     metadata_buffer->mutable_data());
+        if (bytes_read != metadata_len) {
+          throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+        }
+      }
 
-    uint32_t metadata_len =
-        *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
-    int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
-    if (FOOTER_SIZE + metadata_len > file_size) {
-      throw ParquetException(
-          "Invalid parquet file. File is less than "
-          "file metadata size.");
+      file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
     }
+    // encryption
+    else if (bytes_read == footer_read_size &&
+             memcmp(footer_buffer + footer_read_size - 4, PARQUET_EMAGIC, 4) == 0) {
+      // read crypto metadata
+      uint32_t crypto_metadata_len =
+          *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
+      int64_t crypto_metadata_start = file_size - FOOTER_SIZE - crypto_metadata_len;
+
+      if (FOOTER_SIZE + crypto_metadata_len > file_size) {
+        throw ParquetException(
+            "Invalid parquet file. File is less than "
+            "file metadata size.");
+      }
+
+      std::shared_ptr<ResizableBuffer> crypto_metadata_buffer =
+          AllocateBuffer(properties_.memory_pool(), crypto_metadata_len);
+
+      // Check if the footer_buffer contains the entire metadata
+      if (footer_read_size >= (crypto_metadata_len + FOOTER_SIZE)) {
+        memcpy(crypto_metadata_buffer->mutable_data(),
+               footer_buffer + (footer_read_size - crypto_metadata_len - FOOTER_SIZE),
+               crypto_metadata_len);
+      } else {
+        bytes_read = source_->ReadAt(crypto_metadata_start, crypto_metadata_len,
+                                     crypto_metadata_buffer->mutable_data());
+        if (bytes_read != crypto_metadata_len) {
+          throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+        }
+      }
 
-    std::shared_ptr<ResizableBuffer> metadata_buffer =
-        AllocateBuffer(properties_.memory_pool(), metadata_len);
+      file_crypto_metadata_ =
+          FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len);
 
-    // Check if the footer_buffer contains the entire metadata
-    if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
-      memcpy(metadata_buffer->mutable_data(),
-             footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE),
-             metadata_len);
-    } else {
+      int64_t footer_offset = file_crypto_metadata_->footer_offset();
+      uint32_t footer_read_size = (uint32_t)(crypto_metadata_start - footer_offset);
+
+      std::shared_ptr<ResizableBuffer> footer_buffer =
+          AllocateBuffer(properties_.memory_pool(), footer_read_size);
       bytes_read =
-          source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
-      if (bytes_read != metadata_len) {
-        throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+          source_->ReadAt(footer_offset, footer_read_size, footer_buffer->mutable_data());
+
+      if (file_crypto_metadata_->encrypted_footer()) {
+        // get footer key metadata
+        std::string footer_key_metadata = file_crypto_metadata_->footer_key_metadata();
+
+        auto file_decryption = properties_.file_decryption();
+        std::string footer_key = file_decryption->GetFooterKey(footer_key_metadata);
+
+        auto footer_encryption = std::make_shared<EncryptionProperties>(
+            file_crypto_metadata_->encryption_algorithm().algorithm, footer_key,
+            file_decryption->GetAad());
+
+        file_metadata_ = FileMetaData::Make(footer_buffer->data(), &footer_read_size,
+                                            footer_encryption);
+      } else {
+        file_metadata_ = FileMetaData::Make(footer_buffer->data(), &footer_read_size);
       }
     }
-
-    file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
+    // error
+    else {
+      throw ParquetException("Invalid parquet file. Corrupt footer.");
+    }
   }
 
  private:
   std::unique_ptr<RandomAccessSource> source_;
   std::shared_ptr<FileMetaData> file_metadata_;
+  std::shared_ptr<FileCryptoMetaData> file_crypto_metadata_;
   ReaderProperties properties_;
 };
 
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
index 30673c59..c6447938 100644
--- a/src/parquet/file_writer.cc
+++ b/src/parquet/file_writer.cc
@@ -34,6 +34,7 @@ namespace parquet {
 
 // FIXME: copied from reader-internal.cc
 static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+static constexpr uint8_t PARQUET_EMAGIC[4] = {'P', 'A', 'R', 'E'};
 
 // ----------------------------------------------------------------------
 // RowGroupWriter public API
@@ -123,7 +124,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
 
     const ColumnDescriptor* column_descr = col_meta->descr();
     std::unique_ptr<PageWriter> pager =
-        PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
+        PageWriter::Open(sink_, properties_->compression(column_descr->path()),
+                         properties_->encryption(column_descr->path()), col_meta,  // TODO
                          properties_->memory_pool());
     column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
     return column_writers_[0].get();
@@ -221,7 +223,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
       const ColumnDescriptor* column_descr = col_meta->descr();
       std::unique_ptr<PageWriter> pager =
           PageWriter::Open(sink_, properties_->compression(column_descr->path()),
-                           col_meta, properties_->memory_pool(), buffered_row_group_);
+                           properties_->encryption(column_descr->path()), col_meta,
+                           properties_->memory_pool(), buffered_row_group_);
       column_writers_.push_back(
           ColumnWriter::Make(col_meta, std::move(pager), properties_));
     }
@@ -258,7 +261,20 @@ class FileSerializer : public ParquetFileWriter::Contents {
 
       // Write magic bytes and metadata
       auto metadata = metadata_->Finish();
-      WriteFileMetaData(*metadata, sink_.get());
+
+      auto file_encryption = properties_->file_encryption();
+      if (file_encryption == nullptr) {
+        WriteFileMetaData(*metadata, sink_.get());
+      } else {
+        uint64_t metadata_start = static_cast<uint64_t>(sink_->Tell());
+
+        std::shared_ptr<EncryptionProperties> footer_encryption =
+            file_encryption->GetFooterEncryptionProperties();
+        WriteFileMetaData(*metadata, sink_.get(), footer_encryption.get());
+
+        auto crypto_metadata = metadata_->GetCryptoMetaData(metadata_start);
+        WriteFileCryptoMetaData(*crypto_metadata, sink_.get());
+      }
 
       sink_->Close();
       is_open_ = false;
@@ -323,8 +339,12 @@ class FileSerializer : public ParquetFileWriter::Contents {
   std::unique_ptr<RowGroupWriter> row_group_writer_;
 
   void StartFile() {
-    // Parquet files always start with PAR1
-    sink_->Write(PARQUET_MAGIC, 4);
+    if (properties_->file_encryption() == nullptr) {
+      // Parquet files always start with PAR1
+      sink_->Write(PARQUET_MAGIC, 4);
+    } else {
+      sink_->Write(PARQUET_EMAGIC, 4);
+    }
   }
 };
 
@@ -360,16 +380,35 @@ std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
   return result;
 }
 
-void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink) {
-  // Write MetaData
-  uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());
+void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink,
+                       EncryptionProperties* footer_encryption) {
+  if (footer_encryption == nullptr) {
+    // Write MetaData
+    uint32_t metadata_len = static_cast<uint32_t>(sink->Tell());
+
+    file_metadata.WriteTo(sink);
+    metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;
+
+    // Write Footer
+    sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
+    sink->Write(PARQUET_MAGIC, 4);
+  } else {
+    // encrypt and write to sink
+    file_metadata.WriteTo(sink, footer_encryption);
+  }
+}
+
+void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
+                             OutputStream* sink) {
+  uint64_t crypto_offset = static_cast<uint64_t>(sink->Tell());
+
+  // Get a FileCryptoMetaData
+  crypto_metadata.WriteTo(sink);
 
-  file_metadata.WriteTo(sink);
-  metadata_len = static_cast<uint32_t>(sink->Tell()) - metadata_len;
+  auto crypto_len = static_cast<uint32_t>(sink->Tell()) - crypto_offset;
+  sink->Write(reinterpret_cast<uint8_t*>(&crypto_len), 4);
 
-  // Write Footer
-  sink->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
-  sink->Write(PARQUET_MAGIC, 4);
+  sink->Write(PARQUET_EMAGIC, 4);
 }
 
 const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
diff --git a/src/parquet/file_writer.h b/src/parquet/file_writer.h
index cdfe06cd..e3f9e9e7 100644
--- a/src/parquet/file_writer.h
+++ b/src/parquet/file_writer.h
@@ -101,7 +101,10 @@ class PARQUET_EXPORT RowGroupWriter {
 };
 
 PARQUET_EXPORT
-void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink);
+void WriteFileMetaData(const FileMetaData& file_metadata, OutputStream* sink,
+                       EncryptionProperties* encryption_properties = nullptr);
+void WriteFileCryptoMetaData(const FileCryptoMetaData& crypto_metadata,
+                             OutputStream* sink);
 
 class PARQUET_EXPORT ParquetFileWriter {
  public:
diff --git a/src/parquet/metadata.cc b/src/parquet/metadata.cc
index 1cab51f0..9a894daf 100644
--- a/src/parquet/metadata.cc
+++ b/src/parquet/metadata.cc
@@ -88,6 +88,52 @@ std::shared_ptr<RowGroupStatistics> MakeColumnStats(
 }
 
 // MetaData Accessor
+// ColumnCryptoMetaData
+class ColumnCryptoMetaData::ColumnCryptoMetaDataImpl {
+ public:
+  explicit ColumnCryptoMetaDataImpl(const format::ColumnCryptoMetaData* crypto_metadata)
+      : crypto_metadata_(crypto_metadata) {}
+
+  ~ColumnCryptoMetaDataImpl() {}
+
+  bool encrypted_with_footer_key() const {
+    return crypto_metadata_->__isset.ENCRYPTION_WITH_FOOTER_KEY;
+  }
+  bool encrypted_with_column_key() const {
+    return crypto_metadata_->__isset.ENCRYPTION_WITH_COLUMN_KEY;
+  }
+  const std::vector<std::string>& path_in_schema() const {
+    return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.path_in_schema;
+  }
+  const std::string& column_key_metadata() const {
+    return crypto_metadata_->ENCRYPTION_WITH_COLUMN_KEY.column_key_metadata;
+  }
+
+ private:
+  const format::ColumnCryptoMetaData* crypto_metadata_;
+};
+
+std::unique_ptr<ColumnCryptoMetaData> ColumnCryptoMetaData::Make(
+    const uint8_t* metadata) {
+  return std::unique_ptr<ColumnCryptoMetaData>(new ColumnCryptoMetaData(metadata));
+}
+
+ColumnCryptoMetaData::ColumnCryptoMetaData(const uint8_t* metadata)
+    : impl_(new ColumnCryptoMetaDataImpl(
+          reinterpret_cast<const format::ColumnCryptoMetaData*>(metadata))) {}
+
+ColumnCryptoMetaData::~ColumnCryptoMetaData() {}
+
+const std::vector<std::string>& ColumnCryptoMetaData::path_in_schema() const {
+  return impl_->path_in_schema();
+}
+bool ColumnCryptoMetaData::encrypted_with_footer_key() const {
+  return impl_->encrypted_with_footer_key();
+}
+const std::string& ColumnCryptoMetaData::column_key_metadata() const {
+  return impl_->column_key_metadata();
+}
+
 // ColumnChunk metadata
 class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
  public:
@@ -167,6 +213,15 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
     return column_->meta_data.total_uncompressed_size;
   }
 
+  inline std::unique_ptr<ColumnCryptoMetaData> crypto_meta_data() const {
+    if (column_->__isset.crypto_meta_data) {
+      return ColumnCryptoMetaData::Make(
+          reinterpret_cast<const uint8_t*>(&column_->crypto_meta_data));
+    } else {
+      return nullptr;
+    }
+  }
+
  private:
   mutable std::shared_ptr<RowGroupStatistics> stats_;
   std::vector<Encoding::type> encodings_;
@@ -244,6 +299,10 @@ int64_t ColumnChunkMetaData::total_compressed_size() const {
   return impl_->total_compressed_size();
 }
 
+std::unique_ptr<ColumnCryptoMetaData> ColumnChunkMetaData::crypto_meta_data() const {
+  return impl_->crypto_meta_data();
+}
+
 // row-group metadata
 class RowGroupMetaData::RowGroupMetaDataImpl {
  public:
@@ -311,10 +370,11 @@ class FileMetaData::FileMetaDataImpl {
  public:
   FileMetaDataImpl() : metadata_len_(0) {}
 
-  explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len)
+  explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len,
+                            std::shared_ptr<EncryptionProperties> encryption = nullptr)
       : metadata_len_(0) {
     metadata_.reset(new format::FileMetaData);
-    DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+    DeserializeThriftMsg(metadata, metadata_len, metadata_.get(), encryption.get());
     metadata_len_ = *metadata_len;
 
     if (metadata_->__isset.created_by) {
@@ -343,8 +403,8 @@ class FileMetaData::FileMetaDataImpl {
 
   const ApplicationVersion& writer_version() const { return writer_version_; }
 
-  void WriteTo(OutputStream* dst) const {
-    SerializeThriftMsg(metadata_.get(), 1024, dst);
+  void WriteTo(OutputStream* dst, EncryptionProperties* encryption) const {
+    SerializeThriftMsg(metadata_.get(), 1024, dst, encryption);
   }
 
   std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
@@ -408,15 +468,18 @@ class FileMetaData::FileMetaDataImpl {
   std::shared_ptr<const KeyValueMetadata> key_value_metadata_;
 };
 
-std::shared_ptr<FileMetaData> FileMetaData::Make(const uint8_t* metadata,
-                                                 uint32_t* metadata_len) {
+std::shared_ptr<FileMetaData> FileMetaData::Make(
+    const uint8_t* metadata, uint32_t* metadata_len,
+    std::shared_ptr<EncryptionProperties> encryption) {
   // This FileMetaData ctor is private, not compatible with std::make_shared
-  return std::shared_ptr<FileMetaData>(new FileMetaData(metadata, metadata_len));
+  return std::shared_ptr<FileMetaData>(
+      new FileMetaData(metadata, metadata_len, encryption));
 }
 
-FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len)
+FileMetaData::FileMetaData(const uint8_t* metadata, uint32_t* metadata_len,
+                           std::shared_ptr<EncryptionProperties> encryption)
     : impl_{std::unique_ptr<FileMetaDataImpl>(
-          new FileMetaDataImpl(metadata, metadata_len))} {}
+          new FileMetaDataImpl(metadata, metadata_len, encryption))} {}
 
 FileMetaData::FileMetaData()
     : impl_{std::unique_ptr<FileMetaDataImpl>(new FileMetaDataImpl())} {}
@@ -462,7 +525,69 @@ std::shared_ptr<const KeyValueMetadata> FileMetaData::key_value_metadata() const
   return impl_->key_value_metadata();
 }
 
-void FileMetaData::WriteTo(OutputStream* dst) const { return impl_->WriteTo(dst); }
+void FileMetaData::WriteTo(OutputStream* dst, EncryptionProperties* encryption) const {
+  return impl_->WriteTo(dst, encryption);
+}
+
+class FileCryptoMetaData::FileCryptoMetaDataImpl {
+ public:
+  FileCryptoMetaDataImpl() {}
+
+  explicit FileCryptoMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) {
+    metadata_.reset(new format::FileCryptoMetaData);
+    DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+    metadata_len_ = *metadata_len;
+  }
+
+  ~FileCryptoMetaDataImpl() {}
+
+  EncryptionAlgorithm encryption_algorithm() {
+    return FromThrift(metadata_->encryption_algorithm);
+  }
+
+  bool encrypted_footer() { return metadata_->encrypted_footer; }
+
+  const std::string& footer_key_metadata() { return metadata_->footer_key_metadata; }
+
+  uint64_t footer_offset() { return metadata_->footer_offset; }
+
+  const std::string& iv_prefix() { return metadata_->iv_prefix; }
+
+  void WriteTo(OutputStream* dst) const {
+    SerializeThriftMsg(metadata_.get(), 1024, dst);
+  }
+
+ private:
+  friend FileMetaDataBuilder;
+  std::unique_ptr<format::FileCryptoMetaData> metadata_;
+  uint32_t metadata_len_;
+};
+
+EncryptionAlgorithm FileCryptoMetaData::encryption_algorithm() {
+  return impl_->encryption_algorithm();
+}
+bool FileCryptoMetaData::encrypted_footer() { return impl_->encrypted_footer(); }
+const std::string& FileCryptoMetaData::footer_key_metadata() {
+  return impl_->footer_key_metadata();
+}
+uint64_t FileCryptoMetaData::footer_offset() { return impl_->footer_offset(); }
+const std::string& FileCryptoMetaData::iv_prefix() { return impl_->iv_prefix(); }
+
+std::shared_ptr<FileCryptoMetaData> FileCryptoMetaData::Make(
+    const uint8_t* serialized_metadata, uint32_t* metadata_len) {
+  return std::shared_ptr<FileCryptoMetaData>(
+      new FileCryptoMetaData(serialized_metadata, metadata_len));
+}
+
+FileCryptoMetaData::FileCryptoMetaData(const uint8_t* serialized_metadata,
+                                       uint32_t* metadata_len)
+    : impl_(new FileCryptoMetaDataImpl(serialized_metadata, metadata_len)) {}
+
+FileCryptoMetaData::FileCryptoMetaData() : impl_(new FileCryptoMetaDataImpl()) {}
+
+FileCryptoMetaData::~FileCryptoMetaData() {}
+
+void FileCryptoMetaData::WriteTo(OutputStream* dst) const { impl_->WriteTo(dst); }
 
 ApplicationVersion::ApplicationVersion(const std::string& application, int major,
                                        int minor, int patch)
@@ -570,10 +695,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
                                           uint8_t* contents)
       : properties_(props), column_(column) {
     column_chunk_ = reinterpret_cast<format::ColumnChunk*>(contents);
-    column_chunk_->meta_data.__set_type(ToThrift(column->physical_type()));
-    column_chunk_->meta_data.__set_path_in_schema(column->path()->ToDotVector());
-    column_chunk_->meta_data.__set_codec(
-        ToThrift(properties_->compression(column->path())));
+    column_metadata_ = column_chunk_->meta_data;
+    column_metadata_.__set_type(ToThrift(column->physical_type()));
+    column_metadata_.__set_path_in_schema(column->path()->ToDotVector());
+    column_metadata_.__set_codec(ToThrift(properties_->compression(column->path())));
   }
   ~ColumnChunkMetaDataBuilderImpl() {}
 
@@ -600,7 +725,7 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
       stats.__isset.max = val.has_max;
     }
 
-    column_chunk_->meta_data.__set_statistics(stats);
+    column_metadata_.__set_statistics(stats);
   }
 
   void Finish(int64_t num_values, int64_t dictionary_page_offset,
@@ -608,19 +733,20 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
               int64_t compressed_size, int64_t uncompressed_size, bool has_dictionary,
               bool dictionary_fallback) {
     if (dictionary_page_offset > 0) {
-      column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
+      column_metadata_.__set_dictionary_page_offset(dictionary_page_offset);
       column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
     } else {
       column_chunk_->__set_file_offset(data_page_offset + compressed_size);
     }
-    column_chunk_->__isset.meta_data = true;
-    column_chunk_->meta_data.__set_num_values(num_values);
+
+    column_metadata_.__set_num_values(num_values);
     if (index_page_offset >= 0) {
-      column_chunk_->meta_data.__set_index_page_offset(index_page_offset);
+      column_metadata_.__set_index_page_offset(index_page_offset);
     }
-    column_chunk_->meta_data.__set_data_page_offset(data_page_offset);
-    column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);
-    column_chunk_->meta_data.__set_total_compressed_size(compressed_size);
+    column_metadata_.__set_data_page_offset(data_page_offset);
+    column_metadata_.__set_total_uncompressed_size(uncompressed_size);
+    column_metadata_.__set_total_compressed_size(compressed_size);
+
     std::vector<format::Encoding::type> thrift_encodings;
     if (has_dictionary) {
       thrift_encodings.push_back(ToThrift(properties_->dictionary_index_encoding()));
@@ -638,17 +764,68 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     if (dictionary_fallback) {
       thrift_encodings.push_back(ToThrift(Encoding::PLAIN));
     }
-    column_chunk_->meta_data.__set_encodings(thrift_encodings);
+    column_metadata_.__set_encodings(thrift_encodings);
   }
 
   void WriteTo(OutputStream* sink) {
-    SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink);
+    const auto& encrypt_md = properties_->column_encryption_props(column_->path());
+
+    // column is unencrypted
+    if (!encrypt_md || !encrypt_md->encrypted()) {
+      column_chunk_->__isset.meta_data = true;
+      column_chunk_->__set_meta_data(column_metadata_);
+      SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink);
+    } else {  // column is encrypted
+      column_chunk_->__isset.crypto_meta_data = true;
+
+      // encrypted with footer key
+      format::ColumnCryptoMetaData ccmd;
+      if (encrypt_md->encrypted_with_footer_key()) {
+        ccmd.__isset.ENCRYPTION_WITH_FOOTER_KEY = true;
+        ccmd.__set_ENCRYPTION_WITH_FOOTER_KEY(format::EncryptionWithFooterKey());
+      } else {  // encrypted with column key
+        format::EncryptionWithColumnKey eck;
+        eck.__set_column_key_metadata(encrypt_md->key_metadata());
+        eck.__set_path_in_schema(column_->path()->ToDotVector());
+        ccmd.__isset.ENCRYPTION_WITH_COLUMN_KEY = true;
+        ccmd.__set_ENCRYPTION_WITH_COLUMN_KEY(eck);
+      }
+      column_chunk_->__set_crypto_meta_data(ccmd);
+
+      auto footer_encryption = properties_->footer_encryption();
+
+      // non-uniform: footer is unencrypted, or column is encrypted with a column-specific
+      // key
+      if ((footer_encryption == nullptr && encrypt_md->encrypted()) ||
+          !encrypt_md->encrypted_with_footer_key()) {
+        // don't set meta_data
+        column_chunk_->__isset.meta_data = false;
+
+        // Thrift-serialize the ColumnMetaData structure,
+        // encrypt it with the column key, and write the result to the output stream
+        // (first length, then buffer)
+        auto encrypt_props = properties_->encryption(column_->path());
+        uint64_t metadata_start = sink->Tell();
+
+        SerializeThriftMsg(&column_metadata_, sizeof(format::ColumnMetaData), sink,
+                           encrypt_props.get());
+
+        // Set the ColumnMetaData offset at the “file_offset” field in the ColumnChunk.
+        column_chunk_->__set_file_offset(metadata_start);
+      } else {
+        column_chunk_->__isset.meta_data = true;
+        column_chunk_->__set_meta_data(column_metadata_);
+      }
+
+      SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink);
+    }
   }
 
   const ColumnDescriptor* descr() const { return column_; }
 
  private:
   format::ColumnChunk* column_chunk_;
+  format::ColumnMetaData column_metadata_;
   const std::shared_ptr<WriterProperties> properties_;
   const ColumnDescriptor* column_;
 };
@@ -728,20 +905,22 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
          << " columns are initialized";
       throw ParquetException(ss.str());
     }
-    int64_t total_byte_size = 0;
-
-    for (int i = 0; i < schema_->num_columns(); i++) {
-      if (!(row_group_->columns[i].file_offset >= 0)) {
-        std::stringstream ss;
-        ss << "Column " << i << " is not complete.";
-        throw ParquetException(ss.str());
-      }
-      total_byte_size += row_group_->columns[i].meta_data.total_compressed_size;
-    }
-    DCHECK(total_bytes_written == total_byte_size)
-        << "Total bytes in this RowGroup does not match with compressed sizes of columns";
-
-    row_group_->__set_total_byte_size(total_byte_size);
+    //    int64_t total_byte_size = 0;
+
+    //    for (int i = 0; i < schema_->num_columns(); i++) {
+    //      if (!(row_group_->columns[i].file_offset >= 0)) {
+    //        std::stringstream ss;
+    //        ss << "Column " << i << " is not complete.";
+    //        throw ParquetException(ss.str());
+    //      }
+    //      total_byte_size += row_group_->columns[i].meta_data.total_compressed_size;
+    //    }
+    //    DCHECK(total_bytes_written == total_byte_size)
+    //        << "Total bytes in this RowGroup does not match with compressed sizes of
+    //        columns";
+
+    //    row_group_->__set_total_byte_size(total_byte_size);
+    row_group_->__set_total_byte_size(total_bytes_written);
   }
 
   void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; }
@@ -802,6 +981,9 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
       const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
       : properties_(props), schema_(schema), key_value_metadata_(key_value_metadata) {
     metadata_.reset(new format::FileMetaData());
+    if (props->footer_encryption() != nullptr) {
+      crypto_metadata_.reset(new format::FileCryptoMetaData());
+    }
   }
   ~FileMetaDataBuilderImpl() {}
 
@@ -875,8 +1057,39 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
     return file_meta_data;
   }
 
+  std::unique_ptr<FileCryptoMetaData> BuildFileCryptoMetaData(uint64_t footerOffset) {
+    if (crypto_metadata_ == nullptr) {
+      return nullptr;
+    }
+
+    auto file_encryption = properties_->file_encryption();
+    auto footer_encryption = properties_->footer_encryption();
+
+    // build format::FileCryptoMetaData
+    EncryptionAlgorithm encryption_algorithm;
+    encryption_algorithm.algorithm = footer_encryption->algorithm();
+    encryption_algorithm.aad_metadata = file_encryption->aad_metadata();
+    crypto_metadata_->__set_encryption_algorithm(ToThrift(encryption_algorithm));
+    crypto_metadata_->__set_encrypted_footer(!footer_encryption->key().empty());
+
+    std::string footer_key_metadata = file_encryption->footer_key_metadata();
+    if (!footer_key_metadata.empty()) {
+      crypto_metadata_->__set_footer_key_metadata(footer_key_metadata);
+    }
+    crypto_metadata_->__set_footer_offset(footerOffset);
+
+    // TODO set iv_prefix???
+
+    // return as FileCryptoMetaData
+    std::unique_ptr<FileCryptoMetaData> file_crypto_meta_data =
+        std::unique_ptr<FileCryptoMetaData>(new FileCryptoMetaData());
+    file_crypto_meta_data->impl_->metadata_ = std::move(crypto_metadata_);
+    return file_crypto_meta_data;
+  }
+
  protected:
   std::unique_ptr<format::FileMetaData> metadata_;
+  std::unique_ptr<format::FileCryptoMetaData> crypto_metadata_;
 
  private:
   const std::shared_ptr<WriterProperties> properties_;
@@ -907,4 +1120,9 @@ RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
 
 std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
 
+std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData(
+    uint64_t footerOffset) {
+  return impl_->BuildFileCryptoMetaData(footerOffset);
+}
+
 }  // namespace parquet
diff --git a/src/parquet/metadata.h b/src/parquet/metadata.h
index 5d51e3d2..2ed4f2e2 100644
--- a/src/parquet/metadata.h
+++ b/src/parquet/metadata.h
@@ -87,6 +87,22 @@ class ApplicationVersion {
                             SortOrder::type sort_order = SortOrder::SIGNED) const;
 };
 
+class PARQUET_EXPORT ColumnCryptoMetaData {
+ public:
+  static std::unique_ptr<ColumnCryptoMetaData> Make(const uint8_t* metadata);
+  ~ColumnCryptoMetaData();
+
+  const std::vector<std::string>& path_in_schema() const;
+  bool encrypted_with_footer_key() const;
+  const std::string& column_key_metadata() const;
+
+ private:
+  explicit ColumnCryptoMetaData(const uint8_t* metadata);
+
+  class ColumnCryptoMetaDataImpl;
+  std::unique_ptr<ColumnCryptoMetaDataImpl> impl_;
+};
+
 class PARQUET_EXPORT ColumnChunkMetaData {
  public:
   // API convenience to get a MetaData accessor
@@ -115,6 +131,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   int64_t index_page_offset() const;
   int64_t total_compressed_size() const;
   int64_t total_uncompressed_size() const;
+  std::unique_ptr<ColumnCryptoMetaData> crypto_meta_data() const;
 
  private:
   explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr,
@@ -154,8 +171,9 @@ class FileMetaDataBuilder;
 class PARQUET_EXPORT FileMetaData {
  public:
   // API convenience to get a MetaData accessor
-  static std::shared_ptr<FileMetaData> Make(const uint8_t* serialized_metadata,
-                                            uint32_t* metadata_len);
+  static std::shared_ptr<FileMetaData> Make(
+      const uint8_t* serialized_metadata, uint32_t* metadata_len,
+      std::shared_ptr<EncryptionProperties> encryption = nullptr);
 
   ~FileMetaData();
 
@@ -171,7 +189,7 @@ class PARQUET_EXPORT FileMetaData {
 
   const ApplicationVersion& writer_version() const;
 
-  void WriteTo(OutputStream* dst) const;
+  void WriteTo(OutputStream* dst, EncryptionProperties* encryption = nullptr) const;
 
   // Return const-pointer to make it clear that this object is not to be copied
   const SchemaDescriptor* schema() const;
@@ -180,7 +198,8 @@ class PARQUET_EXPORT FileMetaData {
 
  private:
   friend FileMetaDataBuilder;
-  explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+  explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len,
+                        std::shared_ptr<EncryptionProperties> encryption = nullptr);
 
   // PIMPL Idiom
   FileMetaData();
@@ -188,6 +207,31 @@ class PARQUET_EXPORT FileMetaData {
   std::unique_ptr<FileMetaDataImpl> impl_;
 };
 
+class PARQUET_EXPORT FileCryptoMetaData {
+ public:
+  // API convenience to get a MetaData accessor
+  static std::shared_ptr<FileCryptoMetaData> Make(const uint8_t* serialized_metadata,
+                                                  uint32_t* metadata_len);
+  ~FileCryptoMetaData();
+
+  EncryptionAlgorithm encryption_algorithm();
+  bool encrypted_footer();
+  const std::string& footer_key_metadata();
+  uint64_t footer_offset();
+  const std::string& iv_prefix();
+
+  void WriteTo(OutputStream* dst) const;
+
+ private:
+  friend FileMetaDataBuilder;
+  FileCryptoMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+
+  // PIMPL Idiom
+  FileCryptoMetaData();
+  class FileCryptoMetaDataImpl;
+  std::unique_ptr<FileCryptoMetaDataImpl> impl_;
+};
+
 // Builder API
 class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
  public:
@@ -263,6 +307,9 @@ class PARQUET_EXPORT FileMetaDataBuilder {
   // commit the metadata
   std::unique_ptr<FileMetaData> Finish();
 
+  // crypto metadata
+  std::unique_ptr<FileCryptoMetaData> GetCryptoMetaData(uint64_t footerOffset);
+
  private:
   explicit FileMetaDataBuilder(
       const SchemaDescriptor* schema, const std::shared_ptr<WriterProperties>& props,
diff --git a/src/parquet/properties.h b/src/parquet/properties.h
index 83dc2057..1ac61b1e 100644
--- a/src/parquet/properties.h
+++ b/src/parquet/properties.h
@@ -22,10 +22,12 @@
 #include <string>
 #include <unordered_map>
 
+#include "parquet/encryption.h"
 #include "parquet/exception.h"
 #include "parquet/parquet_version.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
+#include "parquet/util/logging.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
@@ -38,6 +40,123 @@ struct ParquetVersion {
 static int64_t DEFAULT_BUFFER_SIZE = 0;
 static bool DEFAULT_USE_BUFFERED_STREAM = false;
 
+class PARQUET_EXPORT ColumnEncryptionProperties {
+ public:
+  class Builder {
+  public:
+    Builder(const std::string& path, bool encrypt)
+    : path_(path), encrypt_(encrypt), encrypted_with_footer_key_(encrypt) {}
+
+    Builder* key(const std::string& key) {
+      DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32);
+      DCHECK(encrypt_);
+
+      key_ = key;
+      return this;
+    }
+    Builder* key_metadata(const std::string& key_id) {
+      DCHECK(!key_id.empty());
+      key_metadata_ = key_id;
+      return this;
+    }
+
+    Builder* key_id(uint32_t key_id) {
+      std::string key_metadata = std::string(reinterpret_cast<char*>(&key_id), 4);
+      this->key_metadata(key_metadata);
+      return this;
+    }
+
+    std::shared_ptr<ColumnEncryptionProperties> build() {
+      return std::make_shared<ColumnEncryptionProperties>(path_, encrypt_, encrypted_with_footer_key_,
+        key_, key_metadata_);
+    }
+
+  private:
+    std::string path_;
+    bool encrypt_;
+    bool encrypted_with_footer_key_;
+    std::string key_;
+    std::string key_metadata_;
+  };
+
+  ColumnEncryptionProperties() = default;
+  ColumnEncryptionProperties(const ColumnEncryptionProperties& other) = default;
+  ColumnEncryptionProperties(ColumnEncryptionProperties&& other) = default;
+
+  ColumnEncryptionProperties(const std::string& path, bool encrypt, bool encrypted_with_footer_key,
+    const std::string& key, const std::string& key_metadata)
+  : path_(path), encrypt_(encrypt), encrypted_with_footer_key_(encrypted_with_footer_key),
+  key_(key), key_metadata_(key_metadata) {}
+
+  const std::string& path() const { return path_; }
+  bool encrypted() const { return encrypt_; }
+  bool encrypted_with_footer_key() const { return encrypted_with_footer_key_; }
+  const std::string& key() const { return key_; }
+  const std::string& key_metadata() const { return key_metadata_; }
+
+ private:
+  std::string path_;
+  bool encrypt_;
+  bool encrypted_with_footer_key_;
+  std::string key_;
+  std::string key_metadata_;
+};
+
+class PARQUET_EXPORT FileDecryptionProperties {
+ public:
+  FileDecryptionProperties(const std::string& footer_key) : footer_key_(footer_key) {
+    DCHECK(footer_key_.length() == 16 || footer_key_.length() == 24 ||
+           footer_key_.length() == 32);
+  }
+
+  FileDecryptionProperties(const std::shared_ptr<DecryptionKeyRetriever>& key_retriever)
+      : key_retriever_(key_retriever) {}
+
+  void SetAad(const std::string& aad) { aad_ = aad; }
+
+  void SetColumnKey(const std::string& name, const std::string& key) {
+    SetColumnKey(std::vector<std::string>({name}), key);
+  }
+
+  void SetColumnKey(const std::vector<std::string>& paths, const std::string& key) {
+    DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32);
+
+    schema::ColumnPath columnPath(paths);
+
+    column_keys_[columnPath.ToDotString()] = key;
+  }
+
+  const std::string& GetColumnKey(const std::shared_ptr<schema::ColumnPath>& columnPath,
+                                  const std::string& key_metadata = "") {
+    if (key_metadata.empty()) {
+      return column_keys_.at(columnPath->ToDotString());
+    }
+    if (key_retriever_ == nullptr) {
+      throw ParquetException("no key retriever is provided for column key metadata");
+    }
+    return key_retriever_->GetKey(key_metadata);
+  }
+
+  const std::string& GetFooterKey(const std::string& footer_key_metadata = "") {
+    if (footer_key_metadata.empty()) {
+      return footer_key_;
+    }
+    if (key_retriever_ == nullptr) {
+      throw ParquetException("no key retriever is provided for footer key metadata");
+    }
+    return key_retriever_->GetKey(footer_key_metadata);
+  }
+  const std::string& GetAad() { return aad_; }
+
+ private:
+  std::string footer_key_;
+  std::string aad_;
+
+  std::map<std::string, std::string> column_keys_;
+
+  std::shared_ptr<DecryptionKeyRetriever> key_retriever_;
+};
+
 class PARQUET_EXPORT ReaderProperties {
  public:
   explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
@@ -70,10 +189,17 @@ class PARQUET_EXPORT ReaderProperties {
 
   int64_t buffer_size() const { return buffer_size_; }
 
+  void file_decryption(const std::shared_ptr<FileDecryptionProperties>& decryption) {
+    file_decryption_ = decryption;
+  }
+
+  FileDecryptionProperties* file_decryption() { return file_decryption_.get(); }
+
  private:
   ::arrow::MemoryPool* pool_;
   int64_t buffer_size_;
   bool buffered_stream_enabled_;
+  std::shared_ptr<FileDecryptionProperties> file_decryption_;
 };
 
 ReaderProperties PARQUET_EXPORT default_reader_properties();
@@ -90,6 +216,10 @@ static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
 static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
+static constexpr Encryption::type DEFAULT_ENCRYPTION_ALGORITHM = Encryption::AES_GCM_V1;
+static constexpr int32_t MAXIMAL_KEY_METADATA_LENGTH = 256;
+static constexpr int32_t MAXIMAL_AAD_METADATA_LENGTH = 256;
+static constexpr bool DEFAULT_ENCRYPT_THE_REST = true;
 
 class PARQUET_EXPORT ColumnProperties {
  public:
@@ -138,6 +268,187 @@ class PARQUET_EXPORT ColumnProperties {
   size_t max_stats_size_;
 };
 
+class PARQUET_EXPORT FileEncryptionProperties {
+ public:
+  class Builder {
+  public:
+    Builder() : algorithm_(DEFAULT_ENCRYPTION_ALGORITHM), uniform_encryption_(true) {}
+
+    Builder(const std::string& key)
+    : algorithm_(DEFAULT_ENCRYPTION_ALGORITHM), uniform_encryption_(true) {
+      DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32);
+      footer_key_ = key;
+    }
+
+    Builder* algorithm(Encryption::type algorithm) {
+      algorithm_ = algorithm;
+      return this;
+    }
+
+    Builder* footer_key(const std::string& key) {
+      DCHECK(key.length() == 16 || key.length() == 24 || key.length() == 32);
+      footer_key_ = key;
+      return this;
+    }
+
+    Builder* footer_key_metadata(const std::string& key_metadata) {
+      DCHECK(!footer_key_.empty());
+      DCHECK(!key_metadata.empty() && key_metadata.length() < MAXIMAL_KEY_METADATA_LENGTH);
+      footer_key_metadata_ = key_metadata;
+      return this;
+    }
+
+    Builder* aad(const std::string& aad) {
+      DCHECK(!aad.empty());
+      aad_ = aad;
+      return this;
+    }
+
+    Builder* aad_metadata(const std::string& aad_metadata) {
+      DCHECK(!aad_.empty());
+      DCHECK(!aad_metadata.empty() && aad_metadata.length() < MAXIMAL_AAD_METADATA_LENGTH);
+      aad_metadata_ = aad_metadata;
+      return this;
+    }
+
+    /**
+     * encrypt_the_rest will define if other columns (not defined in columns argument)
+     * will be encrypted or not
+     * if encrypt_the_rest = true, other columns will be encrypted with footer key
+     * else, other columns will be unencrypted
+     */
+    Builder* column_properties(const std::map<std::string, std::shared_ptr<ColumnEncryptionProperties>>& column_properties,
+      bool encrypt_the_rest = DEFAULT_ENCRYPT_THE_REST) {
+    encrypt_the_rest_ = encrypt_the_rest;
+    column_properties_ = column_properties;
+
+    if (!footer_key_.empty()) {
+      uniform_encryption_ = true;
+
+      for (const auto& col : column_properties) {
+        if (col.second->key().compare(footer_key_) != 0) {
+          uniform_encryption_ = false;
+          break;
+        }
+      }
+    } else {
+      if (encrypt_the_rest) {
+        throw ParquetException("Encrypt the rest with null footer key");
+      }
+      bool all_are_unencrypted = true;
+      for (const auto& col : column_properties) {
+        if (col.second->encrypted()) {
+          if (col.second->key().empty()) {
+            throw ParquetException("Encrypt column with null footer key");
+          }
+          all_are_unencrypted = false;
+        }
+      }
+
+      if (all_are_unencrypted) {
+        throw ParquetException("Footer and all columns unencrypted");
+      }
+    }
+      return this;
+    }
+
+    std::shared_ptr<FileEncryptionProperties> build() {
+        std::shared_ptr<EncryptionProperties> footer_encryption;
+        if (!footer_key_.empty()) {
+            footer_encryption.reset(new EncryptionProperties(algorithm_, footer_key_, aad_));
+        }
+        return std::make_shared<FileEncryptionProperties>(footer_encryption, footer_key_metadata_,
+                                                          aad_metadata_, uniform_encryption_, column_properties_, encrypt_the_rest_);
+    }
+
+  private:
+    Encryption::type algorithm_;
+    std::string footer_key_;
+    std::string footer_key_metadata_;
+
+    std::string aad_;
+    std::string aad_metadata_;
+
+    bool uniform_encryption_;
+
+    std::map<std::string, std::shared_ptr<ColumnEncryptionProperties>> column_properties_;
+    bool encrypt_the_rest_;
+  };
+
+  FileEncryptionProperties(const std::shared_ptr<EncryptionProperties>& footer_encryption,
+                           const std::string& footer_key_metadata, const std::string& aad_metadata,
+                           bool uniform_encryption,
+                           const std::map<std::string, std::shared_ptr<ColumnEncryptionProperties>>& column_properties,
+                           bool encrypt_the_rest)
+      : footer_encryption_(footer_encryption)
+      , footer_key_metadata_(footer_key_metadata)
+      , aad_metadata_(aad_metadata)
+      , uniform_encryption_(uniform_encryption)
+      , column_properties_(column_properties)
+      , encrypt_the_rest_(encrypt_the_rest) {}
+
+  std::shared_ptr<EncryptionProperties> GetFooterEncryptionProperties() {
+    return footer_encryption_;
+  }
+
+  const std::string& footer_key_metadata() const { return footer_key_metadata_; }
+
+  const std::string& aad_metadata() const { return aad_metadata_; }
+
+  std::shared_ptr<ColumnEncryptionProperties> GetColumnCryptoMetaData(
+      const std::shared_ptr<schema::ColumnPath>& path) {
+    // uniform encryption
+    if (uniform_encryption_) {
+      return ColumnEncryptionProperties::Builder(path->ToDotString(), true).build();
+    }
+
+    // non-uniform encryption
+    std::string path_str = path->ToDotString();
+    if (column_properties_.find(path_str) != column_properties_.end()) {
+        return column_properties_[path_str];
+    }
+
+    // encrypted with footer key
+    if (encrypt_the_rest_) {
+      return ColumnEncryptionProperties::Builder(path->ToDotString(), true).build();
+    }
+
+    // unencrypted
+    return ColumnEncryptionProperties::Builder(path->ToDotString(), false).build();
+  }
+
+  std::shared_ptr<EncryptionProperties> GetColumnEncryptionProperties(
+      const std::shared_ptr<schema::ColumnPath>& path) {
+    // uniform encryption
+    if (uniform_encryption_) {
+      return footer_encryption_;
+    }
+
+    // non-uniform encryption
+    std::string path_str = path->ToDotString();
+    if (column_properties_.find(path_str) != column_properties_.end()) {
+        return std::make_shared<EncryptionProperties>(
+            footer_encryption_->algorithm(), column_properties_[path_str]->key(), footer_encryption_->aad());
+    }
+
+    if (encrypt_the_rest_) {
+      return footer_encryption_;
+    }
+
+    return nullptr;
+  }
+
+ private:
+  std::shared_ptr<EncryptionProperties> footer_encryption_;
+  std::string footer_key_metadata_;
+  std::string aad_metadata_;
+
+  bool uniform_encryption_;
+
+  std::map<std::string, std::shared_ptr<ColumnEncryptionProperties>> column_properties_;
+  bool encrypt_the_rest_;
+};
+
 class PARQUET_EXPORT WriterProperties {
  public:
   class Builder {
@@ -278,6 +589,11 @@ class PARQUET_EXPORT WriterProperties {
       return this->compression(path->ToDotString(), codec);
     }
 
+    Builder* encryption(const std::shared_ptr<FileEncryptionProperties>& file_encryption) {
+      file_encryption_ = file_encryption;
+      return this;
+    }
+
     Builder* enable_statistics() {
       default_column_properties_.set_statistics_enabled(true);
       return this;
@@ -323,10 +639,10 @@ class PARQUET_EXPORT WriterProperties {
       for (const auto& item : statistics_enabled_)
         get(item.first).set_statistics_enabled(item.second);
 
-      return std::shared_ptr<WriterProperties>(
-          new WriterProperties(pool_, dictionary_pagesize_limit_, write_batch_size_,
-                               max_row_group_length_, pagesize_, version_, created_by_,
-                               default_column_properties_, column_properties));
+      return std::shared_ptr<WriterProperties>(new WriterProperties(
+          pool_, dictionary_pagesize_limit_, write_batch_size_, max_row_group_length_,
+          pagesize_, version_, created_by_, std::move(file_encryption_),
+          default_column_properties_, column_properties));
     }
 
    private:
@@ -337,6 +653,7 @@ class PARQUET_EXPORT WriterProperties {
     int64_t pagesize_;
     ParquetVersion::type version_;
     std::string created_by_;
+    std::shared_ptr<FileEncryptionProperties> file_encryption_;
 
     // Settings used for each column unless overridden in any of the maps below
     ColumnProperties default_column_properties_;
@@ -360,6 +677,18 @@ class PARQUET_EXPORT WriterProperties {
 
   inline std::string created_by() const { return parquet_created_by_; }
 
+  inline FileEncryptionProperties* file_encryption() const {
+    return parquet_file_encryption_.get();
+  }
+
+  inline std::shared_ptr<EncryptionProperties> footer_encryption() const {
+    if (parquet_file_encryption_ == nullptr) {
+      return nullptr;
+    } else {
+      return parquet_file_encryption_->GetFooterEncryptionProperties();
+    }
+  }
+
   inline Encoding::type dictionary_index_encoding() const {
     if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
       return Encoding::PLAIN_DICTIONARY;
@@ -403,11 +732,30 @@ class PARQUET_EXPORT WriterProperties {
     return column_properties(path).max_statistics_size();
   }
 
+  std::shared_ptr<ColumnEncryptionProperties> column_encryption_props(
+      const std::shared_ptr<schema::ColumnPath>& path) const {
+    if (parquet_file_encryption_) {
+      return parquet_file_encryption_->GetColumnCryptoMetaData(path);
+    } else {
+      return nullptr;
+    }
+  }
+
+  std::shared_ptr<EncryptionProperties> encryption(
+      const std::shared_ptr<schema::ColumnPath>& path) const {
+    if (parquet_file_encryption_) {
+      return parquet_file_encryption_->GetColumnEncryptionProperties(path);
+    } else {
+      return nullptr;
+    }
+  }
+
  private:
   explicit WriterProperties(
       ::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
       int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize,
       ParquetVersion::type version, const std::string& created_by,
+      std::shared_ptr<FileEncryptionProperties> file_encryption,
       const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& column_properties)
       : pool_(pool),
@@ -417,6 +765,7 @@ class PARQUET_EXPORT WriterProperties {
         pagesize_(pagesize),
         parquet_version_(version),
         parquet_created_by_(created_by),
+        parquet_file_encryption_(file_encryption),
         default_column_properties_(default_column_properties),
         column_properties_(column_properties) {}
 
@@ -427,6 +776,7 @@ class PARQUET_EXPORT WriterProperties {
   int64_t pagesize_;
   ParquetVersion::type parquet_version_;
   std::string parquet_created_by_;
+  std::shared_ptr<FileEncryptionProperties> parquet_file_encryption_;
   ColumnProperties default_column_properties_;
   std::unordered_map<std::string, ColumnProperties> column_properties_;
 };
@@ -435,4 +785,4 @@ std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
 
 }  // namespace parquet
 
-#endif  // PARQUET_COLUMN_PROPERTIES_H
+#endif  // PARQUET_COLUMN_PROPERTIES_H
\ No newline at end of file
diff --git a/src/parquet/thrift.h b/src/parquet/thrift.h
index ec7ac906..d759836b 100644
--- a/src/parquet/thrift.h
+++ b/src/parquet/thrift.h
@@ -40,6 +40,8 @@
 
 #include "parquet/exception.h"
 #include "parquet/parquet_types.h"
+#include "parquet/types.h"
+#include "parquet/util/crypto.h"
 #include "parquet/util/logging.h"
 #include "parquet/util/memory.h"
 
@@ -77,6 +79,16 @@ static inline Compression::type FromThrift(format::CompressionCodec::type type)
   return static_cast<Compression::type>(type);
 }
 
+static inline EncryptionAlgorithm FromThrift(format::EncryptionAlgorithm encryption) {
+  if (encryption.__isset.AES_GCM_V1) {
+    return EncryptionAlgorithm{Encryption::AES_GCM_V1,
+                               encryption.AES_GCM_V1.aad_metadata};
+  } else {
+    return EncryptionAlgorithm{Encryption::AES_GCM_CTR_V1,
+                               encryption.AES_GCM_CTR_V1.aad_metadata};
+  }
+}
+
 static inline format::Type::type ToThrift(Type::type type) {
   return static_cast<format::Type::type>(type);
 }
@@ -99,6 +111,20 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) {
   return static_cast<format::CompressionCodec::type>(type);
 }
 
+static inline format::EncryptionAlgorithm ToThrift(EncryptionAlgorithm encryption) {
+  format::EncryptionAlgorithm encryption_algorithm;
+  if (encryption.algorithm == Encryption::AES_GCM_V1) {
+    encryption_algorithm.__isset.AES_GCM_V1 = true;
+    encryption_algorithm.AES_GCM_V1 = format::AesGcmV1();
+    encryption_algorithm.AES_GCM_V1.aad_metadata = encryption.aad_metadata;
+  } else {
+    encryption_algorithm.__isset.AES_GCM_CTR_V1 = true;
+    encryption_algorithm.AES_GCM_CTR_V1 = format::AesGcmCtrV1();
+    encryption_algorithm.AES_GCM_CTR_V1.aad_metadata = encryption.aad_metadata;
+  }
+  return encryption_algorithm;
+}
+
 // ----------------------------------------------------------------------
 // Thrift struct serialization / deserialization utilities
 
@@ -106,31 +132,57 @@ static inline format::CompressionCodec::type ToThrift(Compression::type type) {
 // all the bytes needed to store the thrift message.  On return, len will be
 // set to the actual length of the header.
 template <class T>
-inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg) {
-  // Deserialize msg bytes into c++ thrift msg using memory transport.
-  shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
-      new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
-  apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer>
-      tproto_factory;
-  shared_ptr<apache::thrift::protocol::TProtocol> tproto =
-      tproto_factory.getProtocol(tmem_transport);
-  try {
-    deserialized_msg->read(tproto.get());
-  } catch (std::exception& e) {
-    std::stringstream ss;
-    ss << "Couldn't deserialize thrift: " << e.what() << "\n";
-    throw ParquetException(ss.str());
+inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deserialized_msg,
+                                 const EncryptionProperties* encryption = nullptr) {
+  if (encryption == nullptr) {
+    shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
+        new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
+    apache::thrift::protocol::TCompactProtocolFactoryT<
+        apache::thrift::transport::TMemoryBuffer>
+        tproto_factory;
+    shared_ptr<apache::thrift::protocol::TProtocol> tproto =
+        tproto_factory.getProtocol(tmem_transport);
+    try {
+      deserialized_msg->read(tproto.get());
+    } catch (std::exception& e) {
+      std::stringstream ss;
+      ss << "Couldn't deserialize thrift: " << e.what() << "\n";
+      throw ParquetException(ss.str());
+    }
+    uint32_t bytes_left = tmem_transport->available_read();
+    *len = *len - bytes_left;
+  } else {
+    // first 4 bytes for length
+    uint8_t clenBytes[4];
+    memcpy(clenBytes, buf, 4);
+
+    uint32_t clen = *(reinterpret_cast<uint32_t*>(clenBytes));
+
+    // decrypt
+    std::vector<uint8_t> decrypted_buffer(encryption->CalculatePlainSize(clen));
+
+    uint32_t decrypted_buffer_len = parquet_encryption::Decrypt(
+        encryption->algorithm(), true, &buf[4], clen, encryption->key_bytes(),
+        encryption->key_length(), encryption->aad_bytes(), encryption->aad_length(),
+        decrypted_buffer.data());
+
+    if (decrypted_buffer_len <= 0) {
+      throw ParquetException("Couldn't decrypt buffer\n");
+    }
+
+    DeserializeThriftMsg(decrypted_buffer.data(), &decrypted_buffer_len,
+                         deserialized_msg);
+
+    *len = 4 + clen;
   }
-  uint32_t bytes_left = tmem_transport->available_read();
-  *len = *len - bytes_left;
 }
 
 // Serialize obj into a buffer. The result is returned as a string.
 // The arguments are the object to be serialized and
 // the expected size of the serialized object
 template <class T>
-inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
+inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out,
+                                  const EncryptionProperties* encryption = nullptr) {
   shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
       new apache::thrift::transport::TMemoryBuffer(len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
@@ -150,8 +202,22 @@ inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
   uint8_t* out_buffer;
   uint32_t out_length;
   mem_buffer->getBuffer(&out_buffer, &out_length);
-  out->Write(out_buffer, out_length);
-  return out_length;
+  if (encryption == nullptr) {
+    out->Write(out_buffer, out_length);
+
+    return out_length;
+  } else {
+    std::vector<uint8_t> cipher_buffer(encryption->CalculateCipherSize(len));
+    int cipher_buffer_len = parquet_encryption::Encrypt(
+        encryption->algorithm(), true, out_buffer, out_length, encryption->key_bytes(),
+        encryption->key_length(), encryption->aad_bytes(), encryption->aad_length(),
+        cipher_buffer.data());
+
+    out->Write(reinterpret_cast<uint8_t*>(&cipher_buffer_len), 4);
+    out->Write(cipher_buffer.data(), cipher_buffer_len);
+
+    return cipher_buffer_len + 4;
+  }
 }
 
 }  // namespace parquet
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 10789cbf..36c01de3 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -117,6 +117,64 @@ struct Encryption {
   enum type { AES_GCM_V1 = 0, AES_GCM_CTR_V1 = 1 };
 };
 
+struct EncryptionAlgorithm {
+  Encryption::type algorithm;
+  std::string aad_metadata;
+};
+
+class PARQUET_EXPORT EncryptionProperties {
+ private:
+  static inline uint8_t* str2bytes(const std::string& str) {
+    if (str.empty()) return nullptr;
+
+    char* cbytes = const_cast<char*>(str.c_str());
+    return reinterpret_cast<uint8_t*>(cbytes);
+  }
+
+ public:
+  EncryptionProperties() = default;
+  EncryptionProperties(Encryption::type algorithm, const std::string& key,
+                       const std::string& aad = "")
+      : algorithm_(algorithm), key_(key), aad_(aad) {}
+
+  ~EncryptionProperties() { key_.replace(0, key_.length(), key_.length(), '\0'); }
+
+  int key_length() const { return static_cast<int>(key_.length()); }
+  uint8_t* key_bytes() const { return str2bytes(key_); }
+
+  void aad(const std::string& aad) { aad_ = aad; }
+  int aad_length() const { return static_cast<int>(aad_.length()); }
+  uint8_t* aad_bytes() const { return str2bytes(aad_); }
+
+  Encryption::type algorithm() const { return algorithm_; }
+
+  const std::string& key() const { return key_; }
+  const std::string& aad() const { return aad_; }
+
+  uint32_t CalculateCipherSize(uint32_t plain_len) const {
+    if (algorithm_ == Encryption::AES_GCM_V1) {
+      return plain_len + 28;
+    } else if (algorithm_ == Encryption::AES_GCM_CTR_V1) {
+      return plain_len + 16;
+    }
+    return plain_len;
+  }
+
+  uint32_t CalculatePlainSize(uint32_t cipher_len) const {
+    if (algorithm_ == Encryption::AES_GCM_V1) {
+      return cipher_len - 28;
+    } else if (algorithm_ == Encryption::AES_GCM_CTR_V1) {
+      return cipher_len - 16;
+    }
+    return cipher_len;
+  }
+
+ private:
+  Encryption::type algorithm_;  // encryption algorithm
+  std::string key_;             // encryption key, should have 16, 24, 32-byte length
+  std::string aad_;             // encryption additional authenticated data
+};
+
 // parquet::PageType
 struct PageType {
   enum type { DATA_PAGE, INDEX_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2 };


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [C++] Parquet modular encryption
> --------------------------------
>
>                 Key: PARQUET-1300
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1300
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-cpp
>            Reporter: Gidon Gershinsky
>            Assignee: Deepak Majeti
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: column_reader.cc, column_writer.cc, file_reader.cc, file_writer.cc, thrift.h
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> CPP version of a mechanism for modular encryption and decryption of Parquet files. Allows to keep the data fully encrypted in the storage, while enabling a client to extract a required subset (footer, column(s), pages) and to authenticate / decrypt the extracted data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)