You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/05/29 10:07:30 UTC

[parquet-mr] branch encryption updated: PARQUET-1229: Parquet MR encryption (#776)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/encryption by this push:
     new 06b5372  PARQUET-1229: Parquet MR encryption (#776)
06b5372 is described below

commit 06b5372ddf0f1108c2109cc9011b2497a136830f
Author: ggershinsky <gg...@users.noreply.github.com>
AuthorDate: Fri May 29 13:07:03 2020 +0300

    PARQUET-1229: Parquet MR encryption (#776)
---
 dev/travis-before_install-encryption.sh            |  29 --
 .../internal/column/columnindex/OffsetIndex.java   |   7 +
 .../column/columnindex/OffsetIndexBuilder.java     |   6 +-
 .../org/apache/parquet/format/BlockCipher.java     |   8 +-
 .../java/org/apache/parquet/HadoopReadOptions.java |  32 +-
 .../org/apache/parquet/ParquetReadOptions.java     |  19 +-
 .../apache/parquet/crypto/AADPrefixVerifier.java   |   6 +-
 .../java/org/apache/parquet/crypto/AesCipher.java  |  56 +-
 .../org/apache/parquet/crypto/AesCtrDecryptor.java |  16 +-
 .../org/apache/parquet/crypto/AesCtrEncryptor.java |  16 +-
 .../org/apache/parquet/crypto/AesGcmDecryptor.java |  19 +-
 .../org/apache/parquet/crypto/AesGcmEncryptor.java |  16 +-
 .../parquet/crypto/DecryptionKeyRetriever.java     |   6 +-
 .../crypto/DecryptionPropertiesFactory.java        |   8 +-
 .../crypto/EncryptionPropertiesFactory.java        |   6 +-
 .../crypto/InternalColumnDecryptionSetup.java      |   6 +-
 .../crypto/InternalColumnEncryptionSetup.java      |   6 +-
 .../parquet/crypto/InternalFileDecryptor.java      |  70 +--
 .../parquet/crypto/InternalFileEncryptor.java      |  31 +-
 .../parquet/crypto/KeyAccessDeniedException.java   |  17 +-
 .../apache/parquet/crypto/ModuleCipherFactory.java |  12 +-
 .../crypto/ParquetCryptoRuntimeException.java      |   2 +-
 .../parquet/crypto/StringKeyIdRetriever.java       |  39 --
 ...xception.java => TagVerificationException.java} |  21 +-
 .../format/converter/ParquetMetadataConverter.java | 370 +++++++++++--
 .../parquet/hadoop/ColumnChunkPageReadStore.java   | 127 +++--
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  | 165 +++++-
 .../parquet/hadoop/ColumnIndexFilterUtils.java     |   5 +
 .../parquet/hadoop/DictionaryPageReader.java       |  19 +-
 .../hadoop/InternalParquetRecordWriter.java        |  10 +-
 .../apache/parquet/hadoop/ParquetFileReader.java   | 276 ++++++++--
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 243 +++++++--
 .../apache/parquet/hadoop/ParquetOutputFormat.java |  24 +-
 .../org/apache/parquet/hadoop/ParquetReader.java   |  18 +-
 .../apache/parquet/hadoop/ParquetRecordReader.java |   2 +-
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  30 +-
 .../parquet/hadoop/metadata/BlockMetaData.java     |  20 +-
 .../hadoop/metadata/ColumnChunkMetaData.java       | 166 +++++-
 .../parquet/hadoop/metadata/FileMetaData.java      |  11 +
 .../parquet/hadoop/util/HadoopInputFile.java       |   4 +
 .../apache/parquet/hadoop/TestBloomEncryption.java | 313 +++++++++++
 .../parquet/hadoop/TestColumnIndexEncryption.java  | 571 +++++++++++++++++++++
 42 files changed, 2382 insertions(+), 446 deletions(-)

diff --git a/dev/travis-before_install-encryption.sh b/dev/travis-before_install-encryption.sh
deleted file mode 100755
index 0e3a3f6..0000000
--- a/dev/travis-before_install-encryption.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-################################################################################
-# This is a branch-specific script that gets invoked at the end of
-# travis-before_install.sh. It is run for the bloom-filter branch only.
-################################################################################
-
-cd ..
-git clone https://github.com/apache/parquet-format.git
-cd parquet-format
-mvn install -DskipTests --batch-mode
-cd $TRAVIS_BUILD_DIR
-
-
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
index ba984eb..02d58af 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
@@ -49,6 +49,13 @@ public interface OffsetIndex {
    * @return the index of the first row in the page
    */
   public long getFirstRowIndex(int pageIndex);
+  
+  /**
+   * @param pageIndex
+   *         the index of the page
+   * @return the original ordinal of the page in the column chunk
+   */
+  public int getPageOrdinal(int pageIndex);
 
   /**
    * @param pageIndex
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
index e4907b5..4909744 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
@@ -65,6 +65,11 @@ public class OffsetIndexBuilder {
     public long getFirstRowIndex(int pageIndex) {
       return firstRowIndexes[pageIndex];
     }
+    
+    @Override
+    public int getPageOrdinal(int pageIndex) {
+      return pageIndex;
+    }
   }
 
   private static final OffsetIndexBuilder NO_OP_BUILDER = new OffsetIndexBuilder() {
@@ -171,5 +176,4 @@ public class OffsetIndexBuilder {
 
     return offsetIndex;
   }
-
 }
diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
index 48c0bf2..37b0b58 100755
--- a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
+++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
@@ -35,9 +35,8 @@ public interface BlockCipher{
      * The ciphertext starts at offset 4  and fills up the rest of the returned byte array.
      * The ciphertext includes the nonce and (in case of GCM cipher) the tag, as detailed in the 
      * Parquet Modular Encryption specification.
-     * @throws IOException thrown upon any crypto problem encountered during encryption
      */
-    public byte[] encrypt(byte[] plaintext, byte[] AAD) throws IOException;
+    public byte[] encrypt(byte[] plaintext, byte[] AAD);
   }
 
 
@@ -51,9 +50,8 @@ public interface BlockCipher{
      * Parquet Modular Encryption specification.
      * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher)
      * @return plaintext - starts at offset 0 of the output value, and fills up the entire byte array.
-     * @throws IOException thrown upon any crypto problem encountered during decryption
      */
-    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws IOException;
+    public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
     /**
      * Convenience decryption method that reads the length and ciphertext from the input stream.
@@ -61,7 +59,7 @@ public interface BlockCipher{
      * @param from Input stream with length and ciphertext.
      * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher)
      * @return plaintext -  starts at offset 0 of the output, and fills up the entire byte array.
-     * @throws IOException thrown upon any crypto or IO problem encountered during decryption
+     * @throws IOException - Stream I/O problems
      */
     public byte[] decrypt(InputStream from, byte[] AAD) throws IOException;
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index b16a8c4..43f0f1d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -20,8 +20,11 @@
 package org.apache.parquet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.util.HadoopCodecs;
@@ -55,11 +58,12 @@ public class HadoopReadOptions extends ParquetReadOptions {
                             ByteBufferAllocator allocator,
                             int maxAllocationSize,
                             Map<String, String> properties,
-                            Configuration conf) {
+                            Configuration conf,
+                            FileDecryptionProperties fileDecryptionProperties) {
     super(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
         usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator,
-        maxAllocationSize, properties
+        maxAllocationSize, properties, fileDecryptionProperties
     );
     this.conf = conf;
   }
@@ -81,11 +85,21 @@ public class HadoopReadOptions extends ParquetReadOptions {
     return new Builder(conf);
   }
 
+  public static Builder builder(Configuration conf, Path filePath) {
+    return new Builder(conf, filePath);
+  }
+
   public static class Builder extends ParquetReadOptions.Builder {
     private final Configuration conf;
+    private final Path filePath;
 
     public Builder(Configuration conf) {
+      this(conf, null);
+    }
+
+    public Builder(Configuration conf, Path filePath) {
       this.conf = conf;
+      this.filePath = filePath;
       useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
       useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
       useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
@@ -105,10 +119,22 @@ public class HadoopReadOptions extends ParquetReadOptions {
 
     @Override
     public ParquetReadOptions build() {
+      if (null == fileDecryptionProperties) {
+        // if not set, check if Hadoop conf defines decryption factory and properties 
+        fileDecryptionProperties = createDecryptionProperties(filePath, conf);
+      }
       return new HadoopReadOptions(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
         useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter,
-        codecFactory, allocator, maxAllocationSize, properties, conf);
+        codecFactory, allocator, maxAllocationSize, properties, conf, fileDecryptionProperties);
+    }
+  }
+
+  private static FileDecryptionProperties createDecryptionProperties(Path file, Configuration hadoopConfig) {
+    DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(hadoopConfig);
+    if (null == cryptoFactory) {
+      return null;
     }
+    return cryptoFactory.getFileDecryptionProperties(hadoopConfig, file);
   }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 2fdca3b..f478cef 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -22,6 +22,7 @@ package org.apache.parquet;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.util.HadoopCodecs;
@@ -57,6 +58,7 @@ public class ParquetReadOptions {
   private final ByteBufferAllocator allocator;
   private final int maxAllocationSize;
   private final Map<String, String> properties;
+  private final FileDecryptionProperties fileDecryptionProperties;
 
   ParquetReadOptions(boolean useSignedStringMinMax,
                      boolean useStatsFilter,
@@ -70,7 +72,8 @@ public class ParquetReadOptions {
                      CompressionCodecFactory codecFactory,
                      ByteBufferAllocator allocator,
                      int maxAllocationSize,
-                     Map<String, String> properties) {
+                     Map<String, String> properties,
+                     FileDecryptionProperties fileDecryptionProperties) {
     this.useSignedStringMinMax = useSignedStringMinMax;
     this.useStatsFilter = useStatsFilter;
     this.useDictionaryFilter = useDictionaryFilter;
@@ -84,6 +87,7 @@ public class ParquetReadOptions {
     this.allocator = allocator;
     this.maxAllocationSize = maxAllocationSize;
     this.properties = Collections.unmodifiableMap(properties);
+    this.fileDecryptionProperties = fileDecryptionProperties;
   }
 
   public boolean useSignedStringMinMax() {
@@ -142,6 +146,10 @@ public class ParquetReadOptions {
     return properties.get(property);
   }
 
+  public FileDecryptionProperties getDecryptionProperties() {
+    return fileDecryptionProperties;
+  }
+
   public boolean isEnabled(String property, boolean defaultValue) {
     Optional<String> propValue = Optional.ofNullable(properties.get(property));
     return propValue.isPresent() ? Boolean.valueOf(propValue.get())
@@ -167,6 +175,7 @@ public class ParquetReadOptions {
     protected ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT;
     protected Map<String, String> properties = new HashMap<>();
+    protected FileDecryptionProperties fileDecryptionProperties = null;
 
     public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
       this.useSignedStringMinMax = useSignedStringMinMax;
@@ -277,6 +286,11 @@ public class ParquetReadOptions {
       return this;
     }
 
+    public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties) {
+      this.fileDecryptionProperties = fileDecryptionProperties;
+      return this;
+    }
+
     public Builder set(String key, String value) {
       properties.put(key, value);
       return this;
@@ -292,6 +306,7 @@ public class ParquetReadOptions {
       withCodecFactory(options.codecFactory);
       withAllocator(options.allocator);
       withPageChecksumVerification(options.usePageChecksumVerification);
+      withDecryption(options.fileDecryptionProperties);
       for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
         set(keyValue.getKey(), keyValue.getValue());
       }
@@ -302,7 +317,7 @@ public class ParquetReadOptions {
       return new ParquetReadOptions(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
         useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter,
-        codecFactory, allocator, maxAllocationSize, properties);
+        codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties);
     }
   }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
index a0a8029..ee5169d 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
@@ -19,8 +19,6 @@
 
 package org.apache.parquet.crypto;
 
-import java.io.IOException;
-
 public interface AADPrefixVerifier {
 
   /**
@@ -28,7 +26,7 @@ public interface AADPrefixVerifier {
    * Must be thread-safe.
    * 
    * @param aadPrefix AAD Prefix
-   * @throws IOException Throw exception if AAD prefix is wrong.
+   * @throws ParquetCryptoRuntimeException Throw exception if AAD prefix is wrong.
    */
-  public void verify(byte[] aadPrefix) throws IOException;
+  public void verify(byte[] aadPrefix) throws ParquetCryptoRuntimeException;
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
index fb9588d..6b9f24c 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
@@ -24,7 +24,6 @@ import javax.crypto.spec.SecretKeySpec;
 
 import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
 
-import java.io.IOException;
 import java.security.SecureRandom;
 
 public class AesCipher {
@@ -44,7 +43,7 @@ public class AesCipher {
   protected Cipher cipher;
   protected final byte[] localNonce;
 
-  AesCipher(AesMode mode, byte[] keyBytes) throws IllegalArgumentException, IOException {
+  AesCipher(AesMode mode, byte[] keyBytes) {
     if (null == keyBytes) {
       throw new IllegalArgumentException("Null key bytes");
     }
@@ -67,30 +66,69 @@ public class AesCipher {
   }
 
   public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, 
-      short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) {
+      int rowGroupOrdinal, int columnOrdinal, int pageOrdinal) {
+    
     byte[] typeOrdinalBytes = new byte[1];
     typeOrdinalBytes[0] = moduleType.getValue();
+    
     if (ModuleType.Footer == moduleType) {
       return concatByteArrays(fileAAD, typeOrdinalBytes);      
     }
 
-    byte[] rowGroupOrdinalBytes = shortToBytesLE(rowGroupOrdinal);
-    byte[] columnOrdinalBytes = shortToBytesLE(columnOrdinal);
+    if (rowGroupOrdinal < 0) {
+      throw new IllegalArgumentException("Wrong row group ordinal: " + rowGroupOrdinal);
+    }
+    short shortRGOrdinal = (short) rowGroupOrdinal;
+    if (shortRGOrdinal != rowGroupOrdinal) {
+      throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+          + "more than " + Short.MAX_VALUE + " row groups: " + rowGroupOrdinal);
+    }
+    byte[] rowGroupOrdinalBytes = shortToBytesLE(shortRGOrdinal);
+    
+    if (columnOrdinal < 0) {
+      throw new IllegalArgumentException("Wrong column ordinal: " + columnOrdinal);
+    }
+    short shortColumOrdinal = (short) columnOrdinal;
+    if (shortColumOrdinal != columnOrdinal) {
+      throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+          + "more than " + Short.MAX_VALUE + " columns: " + columnOrdinal);
+    }
+    byte[] columnOrdinalBytes = shortToBytesLE(shortColumOrdinal);
+    
     if (ModuleType.DataPage != moduleType && ModuleType.DataPageHeader != moduleType) {
       return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes); 
     }
 
-    byte[] pageOrdinalBytes = shortToBytesLE(pageOrdinal);
+    if (pageOrdinal < 0) {
+      throw new IllegalArgumentException("Wrong page ordinal: " + pageOrdinal);
+    }
+    short shortPageOrdinal = (short) pageOrdinal;
+    if (shortPageOrdinal != pageOrdinal) {
+      throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+          + "more than " + Short.MAX_VALUE + " pages per chunk: " + pageOrdinal);
+    }
+    byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
+    
     return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes, pageOrdinalBytes);
   }
 
   public static byte[] createFooterAAD(byte[] aadPrefixBytes) {
-    return createModuleAAD(aadPrefixBytes, ModuleType.Footer, (short) -1, (short) -1, (short) -1);
+    return createModuleAAD(aadPrefixBytes, ModuleType.Footer, -1, -1, -1);
   }
 
   // Update last two bytes with new page ordinal (instead of creating new page AAD from scratch)
-  public static void quickUpdatePageAAD(byte[] pageAAD, short newPageOrdinal) {
-    byte[] pageOrdinalBytes = shortToBytesLE(newPageOrdinal);
+  public static void quickUpdatePageAAD(byte[] pageAAD, int newPageOrdinal) {
+    java.util.Objects.requireNonNull(pageAAD);
+    if (newPageOrdinal < 0) {
+      throw new IllegalArgumentException("Wrong page ordinal: " + newPageOrdinal);
+    }
+    short shortPageOrdinal = (short) newPageOrdinal;
+    if (shortPageOrdinal != newPageOrdinal) {
+      throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+          + "more than " + Short.MAX_VALUE + " pages per chunk: " + newPageOrdinal);
+    }
+    
+    byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
     System.arraycopy(pageOrdinalBytes, 0, pageAAD, pageAAD.length - 2, 2);
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
index bf17b8c..447404a 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
@@ -32,13 +32,13 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
 
   private final byte[] ctrIV;
 
-  AesCtrDecryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+  AesCtrDecryptor(byte[] keyBytes) {
     super(AesMode.CTR, keyBytes);
 
     try {
       cipher = Cipher.getInstance(AesMode.CTR.getCipherName());
     } catch (GeneralSecurityException e) {
-      throw new IOException("Failed to create CTR cipher", e);
+      throw new ParquetCryptoRuntimeException("Failed to create CTR cipher", e);
     }
     ctrIV = new byte[CTR_IV_LENGTH];
     // Setting last bit of initial CTR counter to 1
@@ -46,19 +46,18 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
   }
 
   @Override
-  public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD)  throws IOException {
+  public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) {
     int cipherTextOffset = SIZE_LENGTH;
     int cipherTextLength = lengthAndCiphertext.length - SIZE_LENGTH;
 
     return decrypt(lengthAndCiphertext, cipherTextOffset, cipherTextLength, AAD);
   }
 
-  public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, 
-      byte[] AAD)  throws IOException {
+  public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) {
 
     int plainTextLength = cipherTextLength - NONCE_LENGTH;
     if (plainTextLength < 1) {
-      throw new IOException("Wrong input length " + plainTextLength);
+      throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
     }
 
     // Get the nonce from ciphertext
@@ -82,7 +81,7 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
 
       cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset);
     } catch (GeneralSecurityException e) {
-      throw new IOException("Failed to decrypt", e);
+      throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
     }
 
     return plainText;
@@ -118,8 +117,7 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
     while (gotBytes < ciphertextLength) {
       int n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes);
       if (n <= 0) {
-        throw new IOException("Tried to read " + ciphertextLength + 
-            " bytes, but only got " + gotBytes + " bytes.");
+        throw new IOException("Tried to read " + ciphertextLength + " bytes, but only got " + gotBytes + " bytes.");
       }
       gotBytes += n;
     }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
index ec1f8d4..537789c 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
@@ -25,20 +25,19 @@ import javax.crypto.spec.IvParameterSpec;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.format.BlockCipher;
 
-import java.io.IOException;
 import java.security.GeneralSecurityException;
 
 public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
 
   private final byte[] ctrIV;
 
-  AesCtrEncryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+  AesCtrEncryptor(byte[] keyBytes) {
     super(AesMode.CTR, keyBytes);
 
     try {
       cipher = Cipher.getInstance(AesMode.CTR.getCipherName());
     } catch (GeneralSecurityException e) {
-      throw new IOException("Failed to create CTR cipher", e);
+      throw new ParquetCryptoRuntimeException("Failed to create CTR cipher", e);
     }
 
     ctrIV = new byte[CTR_IV_LENGTH];
@@ -47,20 +46,19 @@ public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
   }
 
   @Override
-  public byte[] encrypt(byte[] plainText, byte[] AAD)  throws IOException {
+  public byte[] encrypt(byte[] plainText, byte[] AAD) {
     return encrypt(true, plainText, AAD);
   }
 
-  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD)  throws IOException {
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD) {
     randomGenerator.nextBytes(localNonce);
     return encrypt(writeLength, plainText, localNonce, AAD);
   }
 
-  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD)  
-      throws IOException {
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD) { 
 
     if (nonce.length != NONCE_LENGTH) {
-      throw new IOException("Wrong nonce length " + nonce.length);
+      throw new ParquetCryptoRuntimeException("Wrong nonce length " + nonce.length);
     }
     int plainTextLength = plainText.length;
     int cipherTextLength = NONCE_LENGTH + plainTextLength;
@@ -84,7 +82,7 @@ public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
 
       cipher.doFinal(plainText, inputOffset, inputLength, cipherText, outputOffset);
     }  catch (GeneralSecurityException e) {
-      throw new IOException("Failed to encrypt", e);
+      throw new ParquetCryptoRuntimeException("Failed to encrypt", e);
     }
 
     // Add ciphertext length
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
index d7f1486..1524d8e 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
@@ -19,6 +19,7 @@
 
 package org.apache.parquet.crypto;
 
+import javax.crypto.AEADBadTagException;
 import javax.crypto.Cipher;
 import javax.crypto.spec.GCMParameterSpec;
 
@@ -30,31 +31,29 @@ import java.security.GeneralSecurityException;
 
 public class AesGcmDecryptor extends AesCipher implements BlockCipher.Decryptor{
 
-
-  AesGcmDecryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+  AesGcmDecryptor(byte[] keyBytes) {
     super(AesMode.GCM, keyBytes);
 
     try {
       cipher = Cipher.getInstance(AesMode.GCM.getCipherName());
     } catch (GeneralSecurityException e) {
-      throw new IOException("Failed to create GCM cipher", e);
+      throw new ParquetCryptoRuntimeException("Failed to create GCM cipher", e);
     }
   }
 
   @Override
-  public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD)  throws IOException {
+  public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD)  {
     int cipherTextOffset = SIZE_LENGTH;
     int cipherTextLength = lengthAndCiphertext.length - SIZE_LENGTH;
 
     return decrypt(lengthAndCiphertext, cipherTextOffset, cipherTextLength, AAD);
   }
 
-  public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD)  
-      throws IOException {
+  public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) { 
 
     int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
     if (plainTextLength < 1) {
-      throw new IOException("Wrong input length " + plainTextLength);
+      throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
     }
 
     // Get the nonce from ciphertext
@@ -70,8 +69,10 @@ public class AesGcmDecryptor extends AesCipher implements BlockCipher.Decryptor{
       if (null != AAD) cipher.updateAAD(AAD);
 
       cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset);
-    }  catch (GeneralSecurityException e) {
-      throw new IOException("Failed to decrypt", e);
+    }  catch (AEADBadTagException e) {
+      throw new TagVerificationException("GCM tag check failed", e);
+    } catch (GeneralSecurityException e) {
+      throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
     }
 
     return plainText;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
index bdfed7f..d456447 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
@@ -25,36 +25,34 @@ import javax.crypto.spec.GCMParameterSpec;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.format.BlockCipher;
 
-import java.io.IOException;
 import java.security.GeneralSecurityException;
 
 public class AesGcmEncryptor extends AesCipher implements BlockCipher.Encryptor{
 
-  AesGcmEncryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+  AesGcmEncryptor(byte[] keyBytes) {
     super(AesMode.GCM, keyBytes);
 
     try {
       cipher = Cipher.getInstance(AesMode.GCM.getCipherName());
     } catch (GeneralSecurityException e) {
-      throw new IOException("Failed to create GCM cipher", e);
+      throw new ParquetCryptoRuntimeException("Failed to create GCM cipher", e);
     }
   }
 
   @Override
-  public byte[] encrypt(byte[] plainText, byte[] AAD)  throws IOException {
+  public byte[] encrypt(byte[] plainText, byte[] AAD)  {
     return encrypt(true, plainText, AAD);
   }
 
-  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD)  throws IOException {
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD) {
     randomGenerator.nextBytes(localNonce);
     return encrypt(writeLength, plainText, localNonce, AAD);
   }
 
-  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD)  
-      throws IOException {
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD) { 
 
     if (nonce.length != NONCE_LENGTH) {
-      throw new IOException("Wrong nonce length " + nonce.length);
+      throw new ParquetCryptoRuntimeException("Wrong nonce length " + nonce.length);
     }
     int plainTextLength = plainText.length;
     int cipherTextLength = NONCE_LENGTH + plainTextLength + GCM_TAG_LENGTH;
@@ -71,7 +69,7 @@ public class AesGcmEncryptor extends AesCipher implements BlockCipher.Encryptor{
 
       cipher.doFinal(plainText, inputOffset, inputLength, cipherText, outputOffset);
     } catch (GeneralSecurityException e) {
-      throw new IOException("Failed to encrypt", e);
+      throw new ParquetCryptoRuntimeException("Failed to encrypt", e);
     }
 
     // Add ciphertext length
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
index f134bb8..efa3fd2 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
@@ -19,8 +19,6 @@
 
 package org.apache.parquet.crypto;
 
-import java.io.IOException;
-
 /**
  * Interface for classes retrieving encryption keys using the key metadata.
  * Implementations must be thread-safe, if same KeyRetriever object is passed to multiple file readers.
@@ -35,7 +33,7 @@ public interface DecryptionKeyRetriever {
    * @param keyMetaData arbitrary byte array with encryption key metadata
    * @return encryption key. Key length can be either 16, 24 or 32 bytes.
    * @throws KeyAccessDeniedException thrown upon access control problems (authentication or authorization)
-   * @throws IOException thrown upon key retrieval problems unrelated to access control
+   * @throws ParquetCryptoRuntimeException thrown upon key retrieval problems unrelated to access control
    */
-  public byte[] getKey(byte[] keyMetaData) throws KeyAccessDeniedException, IOException;
+  public byte[] getKey(byte[] keyMetaData) throws KeyAccessDeniedException, ParquetCryptoRuntimeException;
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java
index 4e1f5ac..c2a3d56 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 /**
  * DecryptionPropertiesFactory interface enables transparent activation of Parquet decryption.
  *
- * It's customized implementations produce decryption properties for each Parquet file, using the input information
+ * Its customized implementations produce decryption properties for each Parquet file, using the input information
  * available in Parquet file readers: file path and Hadoop configuration properties that can pass custom parameters
  * required by a crypto factory. A factory implementation can use or ignore any of these inputs.
  *
@@ -52,7 +52,8 @@ public interface DecryptionPropertiesFactory {
    *
    * @param conf Configuration where user specifies the class path
    * @return object with class DecryptionPropertiesFactory if user specified the class path and invoking of
-   * the class succeeds, null if user doesn't specify the class path
+   * the class succeeds. Null if user doesn't specify the class path (no decryption factory then - not required for plaintext files.
+   * Or for plaintext columns in encrypted files with plaintext footer).
    * @throws BadConfigurationException if the instantiation of the configured class fails
    */
   static DecryptionPropertiesFactory loadFactory(Configuration conf) {
@@ -79,7 +80,8 @@ public interface DecryptionPropertiesFactory {
    * @param hadoopConfig Configuration that is used to pass the needed information, e.g. KMS uri
    * @param filePath File path of the parquet file
    *                 Can be used for AAD prefix verification, part of key metadata etc
-   * @return object with class of FileDecryptionProperties
+   * @return object with class of FileDecryptionProperties. Null return value means no decryption properties
+   * are available for the file (not required for plaintext files. Or for plaintext columns in encrypted files with plaintext footer).
    * @throws ParquetCryptoRuntimeException if there is an exception while creating the object
    */
   FileDecryptionProperties getFileDecryptionProperties(Configuration hadoopConfig, Path filePath) throws ParquetCryptoRuntimeException;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java
index 84cbadd..59d2ee3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 /**
  * EncryptionPropertiesFactory interface enables transparent activation of Parquet encryption.
  *
- * It's customized implementations produce encryption properties for each Parquet file, using the input information
+ * Its customized implementations produce encryption properties for each Parquet file, using the input information
  * available in Parquet file writers: file path, file extended schema - and also Hadoop configuration properties that
  * can pass custom parameters required by a crypto factory. A factory implementation can use or ignore any of these
  * inputs.
@@ -55,7 +55,7 @@ public interface EncryptionPropertiesFactory {
    *
    * @param conf Configuration where user specifies the class path
    * @return object with class EncryptionPropertiesFactory if user specified the class path and invoking of
-   * the class succeeds, null if user doesn't specify the class path
+   * the class succeeds. Null if user doesn't specify the class path (no encryption then).
    * @throws BadConfigurationException if the instantiation of the configured class fails
    */
   static EncryptionPropertiesFactory loadFactory(Configuration conf) {
@@ -85,7 +85,7 @@ public interface EncryptionPropertiesFactory {
    *                     Implementations must not presume the path is permanent,
    *                     as the file can be moved or renamed later
    * @param fileWriteContext WriteContext to provide information like schema to build the FileEncryptionProperties
-   * @return object with class of FileEncryptionProperties
+   * @return object with class of FileEncryptionProperties. Null return value means the file should not be encrypted.
    * @throws ParquetCryptoRuntimeException if there is an exception while creating the object
    */
   FileEncryptionProperties getFileEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
index 769c8d2..bab7c43 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
@@ -29,12 +29,12 @@ public class InternalColumnDecryptionSetup {
   private final boolean isEncryptedWithFooterKey;
   private final BlockCipher.Decryptor dataDecryptor;
   private final BlockCipher.Decryptor metaDataDecryptor;
-  private final short columnOrdinal;
+  private final int columnOrdinal;
   private final byte[] keyMetadata;
 
   InternalColumnDecryptionSetup(ColumnPath path, boolean encrypted, 
       boolean isEncryptedWithFooterKey, BlockCipher.Decryptor dataDecryptor, 
-      BlockCipher.Decryptor metaDataDecryptor, short columnOrdinal, byte[] keyMetadata) {
+      BlockCipher.Decryptor metaDataDecryptor, int columnOrdinal, byte[] keyMetadata) {
     this.columnPath = path;
     this.isEncrypted = encrypted;
     this.isEncryptedWithFooterKey = isEncryptedWithFooterKey;
@@ -64,7 +64,7 @@ public class InternalColumnDecryptionSetup {
     return columnPath;
   }
 
-  public short getOrdinal() {
+  public int getOrdinal() {
     return columnOrdinal;
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
index d4d2058..9be396a 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
@@ -32,9 +32,9 @@ public class InternalColumnEncryptionSetup {
   private final BlockCipher.Encryptor metadataEncryptor;
   private final BlockCipher.Encryptor dataEncryptor;
   private final ColumnCryptoMetaData columnCryptoMetaData;
-  private final short ordinal;
+  private final int ordinal;
 
-  InternalColumnEncryptionSetup(ColumnEncryptionProperties encryptionProperties, short ordinal,
+  InternalColumnEncryptionSetup(ColumnEncryptionProperties encryptionProperties, int ordinal,
       BlockCipher.Encryptor dataEncryptor, BlockCipher.Encryptor metaDataEncryptor) {
     this.encryptionProperties = encryptionProperties;
     this.dataEncryptor = dataEncryptor;
@@ -72,7 +72,7 @@ public class InternalColumnEncryptionSetup {
     return columnCryptoMetaData;
   }
 
-  public short getOrdinal() {
+  public int getOrdinal() {
     return ordinal;
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
index d104401..683155d 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
@@ -22,7 +22,6 @@ package org.apache.parquet.crypto;
 import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.EncryptionAlgorithm;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 
@@ -45,7 +44,7 @@ public class InternalFileDecryptor {
   private BlockCipher.Decryptor aesCtrDecryptorWithFooterKey;
   private boolean plaintextFile;
 
-  public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) throws IOException {
+  public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) {
     this.fileDecryptionProperties= fileDecryptionProperties;
     checkPlaintextFooterIntegrity = fileDecryptionProperties.checkFooterIntegrity();
     footerKey = fileDecryptionProperties.getFooterKey();
@@ -56,7 +55,7 @@ public class InternalFileDecryptor {
     this.plaintextFile = false;
   }
 
-  private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) throws IOException {
+  private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) {
     if (null == columnKey) { // Decryptor with footer key
       if (null == aesGcmDecryptorWithFooterKey) {
         aesGcmDecryptorWithFooterKey = ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);
@@ -67,7 +66,7 @@ public class InternalFileDecryptor {
     }
   }
 
-  private BlockCipher.Decryptor getDataModuleDecryptor(byte[] columnKey) throws IOException {
+  private BlockCipher.Decryptor getDataModuleDecryptor(byte[] columnKey) {
     if (algorithm.isSetAES_GCM_V1()) {
       return getThriftModuleDecryptor(columnKey);
     }
@@ -83,21 +82,21 @@ public class InternalFileDecryptor {
     }
   }
 
-  public InternalColumnDecryptionSetup getColumnSetup(ColumnPath path) throws IOException {
+  public InternalColumnDecryptionSetup getColumnSetup(ColumnPath path) {
     if (!fileCryptoMetaDataProcessed) {
-      throw new IOException("Haven't parsed the file crypto metadata yet");
+      throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
     }
     InternalColumnDecryptionSetup columnDecryptionSetup = columnMap.get(path);
     if (null == columnDecryptionSetup) {
-      throw new IOException("Failed to find decryption setup for column " + path);
+      throw new ParquetCryptoRuntimeException("Failed to find decryption setup for column " + path);
     }
 
     return columnDecryptionSetup;
   }
 
-  public BlockCipher.Decryptor getFooterDecryptor() throws IOException {
+  public BlockCipher.Decryptor fetchFooterDecryptor() {
     if (!fileCryptoMetaDataProcessed) {
-      throw new IOException("Haven't parsed the file crypto metadata yet");
+      throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
     }
     if (!encryptedFooter) {
       return null;
@@ -107,7 +106,7 @@ public class InternalFileDecryptor {
   }
 
   public void setFileCryptoMetaData(EncryptionAlgorithm algorithm, 
-      boolean encryptedFooter, byte[] footerKeyMetaData) throws IOException {
+      boolean encryptedFooter, byte[] footerKeyMetaData) {
 
     // first use of the decryptor
     if (!fileCryptoMetaDataProcessed) {
@@ -137,19 +136,20 @@ public class InternalFileDecryptor {
         mustSupplyAadPrefix = algorithm.getAES_GCM_CTR_V1().isSupply_aad_prefix();
         aadFileUnique = algorithm.getAES_GCM_CTR_V1().getAad_file_unique();
       } else {
-        throw new IOException("Unsupported algorithm: " + algorithm);
+        throw new ParquetCryptoRuntimeException("Unsupported algorithm: " + algorithm);
       }
 
       // Handle AAD prefix
       byte[] aadPrefix = aadPrefixInProperties;
       if (mustSupplyAadPrefix && (null == aadPrefixInProperties)) {
-        throw new IOException("AAD prefix used for file encryption, but not stored in file and not supplied in decryption properties");
+        throw new ParquetCryptoRuntimeException("AAD prefix used for file encryption, "
+            + "but not stored in file and not supplied in decryption properties");
       }
 
       if (fileHasAadPrefix) {
         if (null != aadPrefixInProperties) {
           if (!Arrays.equals(aadPrefixInProperties, aadPrefixInFile)) {
-            throw new IOException("AAD Prefix in file and in decryption properties is not the same");
+            throw new ParquetCryptoRuntimeException("AAD Prefix in file and in decryption properties is not the same");
           }
         }
         if (null != aadPrefixVerifier) {
@@ -159,10 +159,10 @@ public class InternalFileDecryptor {
       }
       else {
         if (!mustSupplyAadPrefix && (null != aadPrefixInProperties)) {
-          throw new IOException("AAD Prefix set in decryption properties, but was not used for file encryption");
+          throw new ParquetCryptoRuntimeException("AAD Prefix set in decryption properties, but was not used for file encryption");
         }
         if (null != aadPrefixVerifier) {
-          throw new IOException("AAD Prefix Verifier is set, but AAD Prefix not found in file");
+          throw new ParquetCryptoRuntimeException("AAD Prefix Verifier is set, but AAD Prefix not found in file");
         }
       }
 
@@ -176,20 +176,20 @@ public class InternalFileDecryptor {
       if (null == footerKey) { // ignore footer key metadata if footer key is explicitly set via API
         if (encryptedFooter || checkPlaintextFooterIntegrity) {
           if (null == footerKeyMetaData) {
-            throw new IOException("No footer key or key metadata");
+            throw new ParquetCryptoRuntimeException("No footer key or key metadata");
           }
           if (null == keyRetriever) {
-            throw new IOException("No footer key or key retriever");
+            throw new ParquetCryptoRuntimeException("No footer key or key retriever");
           }
 
           try {
             footerKey = keyRetriever.getKey(footerKeyMetaData);
           } catch (KeyAccessDeniedException e) {
-            throw new IOException("Footer key: access denied", e);
+            throw new KeyAccessDeniedException("Footer key: access denied", e);
           }
 
           if (null == footerKey) {
-            throw new IOException("Footer key unavailable");
+            throw new ParquetCryptoRuntimeException("Footer key unavailable");
           }
         }
       }
@@ -197,35 +197,35 @@ public class InternalFileDecryptor {
       // re-use of the decryptor 
       // check the crypto metadata.
       if (!this.algorithm.equals(algorithm)) {
-        throw new IOException("Decryptor re-use: Different algorithm");
+        throw new ParquetCryptoRuntimeException("Decryptor re-use: Different algorithm");
       }
       if (encryptedFooter != this.encryptedFooter) {
-        throw new IOException("Decryptor re-use: Different footer encryption");
+        throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer encryption");
       }
       if (!Arrays.equals(this.footerKeyMetaData, footerKeyMetaData)) {
-        throw new IOException("Decryptor re-use: Different footer key metadata ");
+        throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer key metadata");
       }
     }
   }
 
   public InternalColumnDecryptionSetup setColumnCryptoMetadata(ColumnPath path, boolean encrypted, 
-      boolean encryptedWithFooterKey, byte[] keyMetadata, short columnOrdinal) throws IOException {
+      boolean encryptedWithFooterKey, byte[] keyMetadata, int columnOrdinal) {
 
     if (!fileCryptoMetaDataProcessed) {
-      throw new IOException("Haven't parsed the file crypto metadata yet");
+      throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
     }
 
     InternalColumnDecryptionSetup columnDecryptionSetup = columnMap.get(path);
     if (null != columnDecryptionSetup) {
       if (columnDecryptionSetup.isEncrypted() != encrypted) {
-        throw new IOException("Re-use: wrong encrypted flag. Column: " + path);
+        throw new ParquetCryptoRuntimeException("Re-use: wrong encrypted flag. Column: " + path);
       }
       if (encrypted) {
         if (encryptedWithFooterKey != columnDecryptionSetup.isEncryptedWithFooterKey()) {
-          throw new IOException("Re-use: wrong encryption key (column vs footer). Column: " + path);
+          throw new ParquetCryptoRuntimeException("Re-use: wrong encryption key (column vs footer). Column: " + path);
         }
         if (!encryptedWithFooterKey && !Arrays.equals(columnDecryptionSetup.getKeyMetadata(), keyMetadata)) {
-          throw new IOException("Decryptor re-use: Different footer key metadata ");
+          throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer key metadata ");
         }
       }
       return columnDecryptionSetup;
@@ -236,7 +236,7 @@ public class InternalFileDecryptor {
     } else {
       if (encryptedWithFooterKey) {
         if (null == footerKey) {
-          throw new IOException("Column " + path + " is encrypted with NULL footer key");
+          throw new ParquetCryptoRuntimeException("Column " + path + " is encrypted with NULL footer key");
         }
         columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, true, 
             getDataModuleDecryptor(null), getThriftModuleDecryptor(null), columnOrdinal, null);
@@ -248,12 +248,12 @@ public class InternalFileDecryptor {
           try {
             columnKeyBytes = keyRetriever.getKey(keyMetadata);
           } catch (KeyAccessDeniedException e) {
-            throw new IOException("Column " + path + ": key access denied", e);
+            throw new KeyAccessDeniedException("Column " + path + ": key access denied", e);
           }
         }
 
         if (null == columnKeyBytes) { // Hidden column: encrypted, but key unavailable
-          throw new IOException("Column " + path + ": key unavailable");
+          throw new ParquetCryptoRuntimeException("Column " + path + ": key unavailable");
         } else { // Key is available
           columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, false, 
               getDataModuleDecryptor(columnKeyBytes), getThriftModuleDecryptor(columnKeyBytes), columnOrdinal, keyMetadata);
@@ -269,12 +269,12 @@ public class InternalFileDecryptor {
     return this.fileAAD;
   }
 
-  public AesGcmEncryptor getSignedFooterEncryptor() throws IOException  {
+  public AesGcmEncryptor createSignedFooterEncryptor() {
     if (!fileCryptoMetaDataProcessed) {
-      throw new IOException("Haven't parsed the file crypto metadata yet");
+      throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
     }
     if (encryptedFooter) {
-      throw new IOException("Requesting signed footer encryptor in file with encrypted footer");
+      throw new ParquetCryptoRuntimeException("Requesting signed footer encryptor in file with encrypted footer");
     }
 
     return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
@@ -295,5 +295,9 @@ public class InternalFileDecryptor {
   public boolean plaintextFile() {
     return plaintextFile;
   }
+
+  public FileDecryptionProperties getDecryptionProperties() {
+    return fileDecryptionProperties;
+  }
 }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
index 9d408ef..c167a5e 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
@@ -24,7 +24,6 @@ import org.apache.parquet.format.FileCryptoMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.EncryptionAlgorithm;
 
-import java.io.IOException;
 import java.util.HashMap;
 
 public class InternalFileEncryptor {
@@ -41,7 +40,7 @@ public class InternalFileEncryptor {
   private BlockCipher.Encryptor aesCtrEncryptorWithFooterKey;
   private boolean fileCryptoMetaDataCreated;
 
-  public InternalFileEncryptor(FileEncryptionProperties fileEncryptionProperties) throws IOException {
+  public InternalFileEncryptor(FileEncryptionProperties fileEncryptionProperties) {
     this.fileEncryptionProperties = fileEncryptionProperties;
     algorithm = fileEncryptionProperties.getAlgorithm();
     footerKey = fileEncryptionProperties.getFooterKey();
@@ -52,7 +51,7 @@ public class InternalFileEncryptor {
     fileCryptoMetaDataCreated = false;
   }
 
-  private BlockCipher.Encryptor getThriftModuleEncryptor(byte[] columnKey) throws IOException {
+  private BlockCipher.Encryptor getThriftModuleEncryptor(byte[] columnKey) {
     if (null == columnKey) { // Encryptor with footer key
       if (null == aesGcmEncryptorWithFooterKey) {
         aesGcmEncryptorWithFooterKey = ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
@@ -63,7 +62,7 @@ public class InternalFileEncryptor {
     }
   }
 
-  private BlockCipher.Encryptor getDataModuleEncryptor(byte[] columnKey) throws IOException {
+  private BlockCipher.Encryptor getDataModuleEncryptor(byte[] columnKey) {
     if (algorithm.isSetAES_GCM_V1()) {
       return getThriftModuleEncryptor(columnKey);
     }
@@ -79,27 +78,27 @@ public class InternalFileEncryptor {
   }
 
   public InternalColumnEncryptionSetup getColumnSetup(ColumnPath columnPath, 
-      boolean createIfNull, short ordinal) throws IOException {
+      boolean createIfNull, int ordinal) {
     InternalColumnEncryptionSetup internalColumnProperties = columnMap.get(columnPath);
 
     if (null != internalColumnProperties) {
       if (ordinal != internalColumnProperties.getOrdinal()) {
-        throw new IOException("Column ordinal doesnt match " + columnPath + 
+        throw new ParquetCryptoRuntimeException("Column ordinal doesnt match " + columnPath + 
             ": " + ordinal + ", "+internalColumnProperties.getOrdinal());
       }
       return internalColumnProperties;
     }
 
     if (!createIfNull) {
-      throw new IOException("No encryption setup found for column " + columnPath);
+      throw new ParquetCryptoRuntimeException("No encryption setup found for column " + columnPath);
     }
     if (fileCryptoMetaDataCreated) {
-      throw new IOException("Re-use: No encryption setup for column " + columnPath);
+      throw new ParquetCryptoRuntimeException("Re-use: No encryption setup for column " + columnPath);
     }
 
     ColumnEncryptionProperties columnProperties = fileEncryptionProperties.getColumnProperties(columnPath);
     if (null == columnProperties) {
-      throw new IOException("No encryption properties for column " + columnPath);
+      throw new ParquetCryptoRuntimeException("No encryption properties for column " + columnPath);
     }
     if (columnProperties.isEncrypted()) {
       if (columnProperties.isEncryptedWithFooterKey()) {
@@ -118,14 +117,14 @@ public class InternalFileEncryptor {
     return internalColumnProperties;
   }
 
-  public BlockCipher.Encryptor getFooterEncryptor() throws IOException  {
+  public BlockCipher.Encryptor getFooterEncryptor()  {
     if (!encryptFooter) return null;
     return getThriftModuleEncryptor(null);
   }
 
-  public FileCryptoMetaData getFileCryptoMetaData() throws IOException {
+  public FileCryptoMetaData getFileCryptoMetaData() {
     if (!encryptFooter) {
-      throw new IOException("Requesting FileCryptoMetaData in file with unencrypted footer");
+      throw new ParquetCryptoRuntimeException("Requesting FileCryptoMetaData in file with unencrypted footer");
     }
     FileCryptoMetaData fileCryptoMetaData = new FileCryptoMetaData(algorithm);
     if (null != footerKeyMetadata) {
@@ -159,16 +158,16 @@ public class InternalFileEncryptor {
     return this.fileAAD;
   }
 
-  public byte[] getFooterSigningKeyMetaData()  throws IOException {
+  public byte[] getFooterSigningKeyMetaData() {
     if (encryptFooter) {
-      throw new IOException("Requesting signing footer key metadata in file with encrypted footer");
+      throw new ParquetCryptoRuntimeException("Requesting signing footer key metadata in file with encrypted footer");
     }
     return footerKeyMetadata;
   }
 
-  public AesGcmEncryptor getSignedFooterEncryptor() throws IOException  {
+  public AesGcmEncryptor getSignedFooterEncryptor() {
     if (encryptFooter) {
-      throw new IOException("Requesting signed footer encryptor in file with encrypted footer");
+      throw new ParquetCryptoRuntimeException("Requesting signed footer encryptor in file with encrypted footer");
     }
     return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
index 0d9d84f..bff495a 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
@@ -19,12 +19,21 @@
 
 package org.apache.parquet.crypto;
 
-import java.security.KeyException;
 
-public class KeyAccessDeniedException extends KeyException {
+public class KeyAccessDeniedException extends ParquetCryptoRuntimeException {
   private static final long serialVersionUID = 1L;
 
-  public KeyAccessDeniedException(String keyID) {
-    super(keyID);
+  public KeyAccessDeniedException() {}
+
+  public KeyAccessDeniedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public KeyAccessDeniedException(String message) {
+    super(message);
+  }
+
+  public KeyAccessDeniedException(Throwable cause) {
+    super(cause);
   }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
index bbde66e..1c25313 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
@@ -21,8 +21,6 @@ package org.apache.parquet.crypto;
 
 import org.apache.parquet.format.BlockCipher;
 
-import java.io.IOException;
-
 public class ModuleCipherFactory {
 
   // Parquet Module types
@@ -34,7 +32,9 @@ public class ModuleCipherFactory {
     DataPageHeader((byte)4),
     DictionaryPageHeader((byte)5),
     ColumnIndex((byte)6),
-    OffsetIndex((byte)7);
+    OffsetIndex((byte)7),
+    BloomFilterHeader((byte)8),
+    BloomFilterBitset((byte)9);
 
     private final byte value;
 
@@ -49,8 +49,7 @@ public class ModuleCipherFactory {
 
   public static final int SIZE_LENGTH = 4;
 
-  public static BlockCipher.Encryptor getEncryptor(AesMode mode, byte[] keyBytes) 
-      throws IllegalArgumentException, IOException {
+  public static BlockCipher.Encryptor getEncryptor(AesMode mode, byte[] keyBytes) {
     switch (mode) {
     case GCM:
       return new AesGcmEncryptor(keyBytes);
@@ -61,8 +60,7 @@ public class ModuleCipherFactory {
     }
   }
 
-  public static BlockCipher.Decryptor getDecryptor(AesMode mode, byte[] keyBytes) 
-      throws IllegalArgumentException, IOException {
+  public static BlockCipher.Decryptor getDecryptor(AesMode mode, byte[] keyBytes) {
     switch (mode) {
     case GCM:
       return new AesGcmDecryptor(keyBytes);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java
index 60255b5..cb9489c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java
@@ -21,7 +21,7 @@ package org.apache.parquet.crypto;
 import org.apache.parquet.ParquetRuntimeException;
 
 /**
- * Thrown when an encryption or decryption operation problem is occurred
+ * Thrown upon encryption or decryption operation problem 
  */
 public class ParquetCryptoRuntimeException extends ParquetRuntimeException {
   private static final long serialVersionUID = 1L;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java
deleted file mode 100755
index 7b15089..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.crypto;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Hashtable;
-
-// Simple key retriever, based on UTF8 strings as key identifiers
-public class StringKeyIdRetriever implements DecryptionKeyRetriever{
-
-  private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
-
-  public void putKey(String keyId, byte[] keyBytes) {
-    keyMap.put(keyId, keyBytes);
-  }
-
-  @Override
-  public byte[] getKey(byte[] keyMetaData) {
-    String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
-    return keyMap.get(keyId);
-  }
-}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/TagVerificationException.java
old mode 100755
new mode 100644
similarity index 69%
rename from parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java
rename to parquet-hadoop/src/main/java/org/apache/parquet/crypto/TagVerificationException.java
index e4c3dca..5fc6acd
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/TagVerificationException.java
@@ -19,20 +19,21 @@
 
 package org.apache.parquet.crypto;
 
-import org.apache.parquet.ParquetRuntimeException;
 
-/**
- * Reader doesn't have key for encrypted column,
- * but tries to access its contents
- */
-public class HiddenColumnException extends ParquetRuntimeException {
+public class TagVerificationException extends ParquetCryptoRuntimeException {
   private static final long serialVersionUID = 1L;
 
-  public HiddenColumnException(String string) {
-    super(string);
+  public TagVerificationException() {}
+
+  public TagVerificationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public TagVerificationException(String message) {
+    super(message);
   }
 
-  public HiddenColumnException(String string, Exception e) {
-    super(string, e);
+  public TagVerificationException(Throwable cause) {
+    super(cause);
   }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3908463..2c93d31 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -22,8 +22,10 @@ import static java.util.Optional.empty;
 
 import static java.util.Optional.of;
 import static org.apache.parquet.format.Util.readFileMetaData;
+import static org.apache.parquet.format.Util.writeColumnMetaData;
 import static org.apache.parquet.format.Util.writePageHeader;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -43,9 +45,19 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.TagVerificationException;
+import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.BloomFilterAlgorithm;
 import org.apache.parquet.format.BloomFilterCompression;
 import org.apache.parquet.format.BloomFilterHash;
@@ -75,6 +87,7 @@ import org.apache.parquet.format.XxHash;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.BoundaryOrder;
 import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnCryptoMetaData;
 import org.apache.parquet.format.ColumnIndex;
 import org.apache.parquet.format.ColumnMetaData;
 import org.apache.parquet.format.ColumnOrder;
@@ -83,6 +96,7 @@ import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.DictionaryPageHeader;
 import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.EncryptionWithColumnKey;
 import org.apache.parquet.format.FieldRepetitionType;
 import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.KeyValue;
@@ -176,12 +190,17 @@ public class ParquetMetadataConverter {
       cachedEncodingSets = new ConcurrentHashMap<Set<org.apache.parquet.column.Encoding>, Set<org.apache.parquet.column.Encoding>>();
 
   public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
+    return toParquetMetadata(currentVersion, parquetMetadata, null);
+  }
+
+  public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata,
+      InternalFileEncryptor fileEncryptor) {
     List<BlockMetaData> blocks = parquetMetadata.getBlocks();
     List<RowGroup> rowGroups = new ArrayList<RowGroup>();
     long numRows = 0;
     for (BlockMetaData block : blocks) {
       numRows += block.getRowCount();
-      addRowGroup(parquetMetadata, rowGroups, block);
+      addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor);
     }
     FileMetaData fileMetaData = new FileMetaData(
         currentVersion,
@@ -463,14 +482,29 @@ public class ParquetMetadataConverter {
     }
   }
 
-  private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
+  private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block,
+      InternalFileEncryptor fileEncryptor) {
+    
     //rowGroup.total_byte_size = ;
     List<ColumnChunkMetaData> columns = block.getColumns();
     List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
+    int rowGroupOrdinal = rowGroups.size();
+    int columnOrdinal = -1;
+    ByteArrayOutputStream tempOutStream = null;
     for (ColumnChunkMetaData columnMetaData : columns) {
       ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset
       columnChunk.file_path = block.getPath(); // they are in the same file for now
-      columnChunk.meta_data = new ColumnMetaData(
+      InternalColumnEncryptionSetup columnSetup = null;
+      boolean writeCryptoMetadata = false;
+      boolean encryptMetaData = false;
+      ColumnPath path = columnMetaData.getPath();
+      if (null != fileEncryptor) {
+        columnOrdinal++;
+        columnSetup = fileEncryptor.getColumnSetup(path, false, columnOrdinal);
+        writeCryptoMetadata = columnSetup.isEncrypted();
+        encryptMetaData = fileEncryptor.encryptColumnMetaData(columnSetup);
+      }
+      ColumnMetaData metaData = new ColumnMetaData(
           getType(columnMetaData.getType()),
           toFormatEncodings(columnMetaData.getEncodings()),
           Arrays.asList(columnMetaData.getPath().toArray()),
@@ -479,14 +513,45 @@ public class ParquetMetadataConverter {
           columnMetaData.getTotalUncompressedSize(),
           columnMetaData.getTotalSize(),
           columnMetaData.getFirstDataPageOffset());
-      columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
-      columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
+      metaData.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+      metaData.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
       if (!columnMetaData.getStatistics().isEmpty()) {
-        columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
+        metaData.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
       }
       if (columnMetaData.getEncodingStats() != null) {
-        columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
+        metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
       }
+      
+      if (!encryptMetaData) {
+        columnChunk.setMeta_data(metaData);
+      }  else {
+        // Serialize and encrypt ColumnMetadata separately 
+        byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
+            ModuleType.ColumnMetaData, rowGroupOrdinal, columnSetup.getOrdinal(), -1);
+        if (null == tempOutStream) {
+          tempOutStream = new ByteArrayOutputStream();
+        }  else {
+          tempOutStream.reset();
+        }
+        try {
+          writeColumnMetaData(metaData, tempOutStream, columnSetup.getMetaDataEncryptor(), columnMetaDataAAD);
+        } catch (IOException e) {
+          throw new ParquetCryptoRuntimeException("Failed to serialize and encrypt ColumnMetadata for " +
+                                                  columnMetaData.getPath(), e);
+        }
+        columnChunk.setEncrypted_column_metadata(tempOutStream.toByteArray());
+        // Keep redacted metadata version for old readers
+        if (!fileEncryptor.isFooterEncrypted()) {
+          ColumnMetaData metaDataRedacted  = metaData.deepCopy();
+          if (metaDataRedacted.isSetStatistics()) metaDataRedacted.unsetStatistics();
+          if (metaDataRedacted.isSetEncoding_stats()) metaDataRedacted.unsetEncoding_stats();
+          columnChunk.setMeta_data(metaDataRedacted);
+        }
+      }
+      if (writeCryptoMetadata) {
+        columnChunk.setCrypto_metadata(columnSetup.getColumnCryptoMetaData());
+      }
+      
 //      columnChunk.meta_data.index_page_offset = ;
 //      columnChunk.meta_data.key_value_metadata = ; // nothing yet
 
@@ -504,6 +569,9 @@ public class ParquetMetadataConverter {
       parquetColumns.add(columnChunk);
     }
     RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
+    rowGroup.setFile_offset(block.getStartingPos()); 
+    rowGroup.setTotal_compressed_size(block.getCompressedSize());
+    rowGroup.setOrdinal((short) rowGroupOrdinal);
     rowGroups.add(rowGroup);
   }
 
@@ -1145,15 +1213,28 @@ public class ParquetMetadataConverter {
     List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
     for (RowGroup rowGroup : rowGroups) {
       long totalSize = 0;
-      long startIndex = getOffset(rowGroup.getColumns().get(0));
-      for (ColumnChunk col : rowGroup.getColumns()) {
-        totalSize += col.getMeta_data().getTotal_compressed_size();
+      long startIndex;
+      
+      if (rowGroup.isSetFile_offset()) {
+        startIndex = rowGroup.getFile_offset();
+      } else {
+        startIndex = getOffset(rowGroup.getColumns().get(0));
       }
+      
+      if (rowGroup.isSetTotal_compressed_size()) {
+        totalSize = rowGroup.getTotal_compressed_size();
+      } else {
+        for (ColumnChunk col : rowGroup.getColumns()) {
+          totalSize += col.getMeta_data().getTotal_compressed_size();
+        }
+      }
+      
       long midPoint = startIndex + totalSize / 2;
       if (filter.contains(midPoint)) {
         newRowGroups.add(rowGroup);
       }
     }
+    
     metaData.setRow_groups(newRowGroups);
     return metaData;
   }
@@ -1163,7 +1244,13 @@ public class ParquetMetadataConverter {
     List<RowGroup> rowGroups = metaData.getRow_groups();
     List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
     for (RowGroup rowGroup : rowGroups) {
-      long startIndex = getOffset(rowGroup.getColumns().get(0));
+      long startIndex;
+      if (rowGroup.isSetFile_offset()) {
+        startIndex = rowGroup.getFile_offset();
+      } else {
+        startIndex = getOffset(rowGroup.getColumns().get(0));
+      }
+      
       if (filter.contains(startIndex)) {
         newRowGroups.add(rowGroup);
       }
@@ -1173,8 +1260,12 @@ public class ParquetMetadataConverter {
   }
 
   static long getOffset(RowGroup rowGroup) {
+    if (rowGroup.isSetFile_offset()) {
+      return rowGroup.getFile_offset();
+    }
     return getOffset(rowGroup.getColumns().get(0));
   }
+  
   // Visible for testing
   static long getOffset(ColumnChunk columnChunk) {
     ColumnMetaData md = columnChunk.getMeta_data();
@@ -1185,70 +1276,186 @@ public class ParquetMetadataConverter {
     return offset;
   }
 
+  private static void verifyFooterIntegrity(InputStream from, InternalFileDecryptor fileDecryptor, 
+      int combinedFooterLength) throws IOException {
+    
+    byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
+    from.read(nonce);
+    byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
+    from.read(gcmTag);
+    
+    AesGcmEncryptor footerSigner =  fileDecryptor.createSignedFooterEncryptor();
+    
+    byte[] footerAndSignature = ((ByteBufferInputStream) from).slice(0).array();
+    int footerSignatureLength = AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH;
+    byte[] serializedFooter = new byte[combinedFooterLength - footerSignatureLength];
+    System.arraycopy(footerAndSignature, 0, serializedFooter, 0, serializedFooter.length);
+
+    byte[] signedFooterAAD = AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
+    byte[] encryptedFooterBytes = footerSigner.encrypt(false, serializedFooter, nonce, signedFooterAAD);
+    byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH];
+    System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - AesCipher.GCM_TAG_LENGTH, 
+        calculatedTag, 0, AesCipher.GCM_TAG_LENGTH);
+    if (!Arrays.equals(gcmTag, calculatedTag)) {
+      throw new TagVerificationException("Signature mismatch in plaintext footer");
+    }
+  }
+
   public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
+    return readParquetMetadata(from, filter, null, false, 0);
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter,
+      final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter, 
+      final int combinedFooterLength) throws IOException {
+    
+    final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null);
+    final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
+    
     FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
       @Override
       public FileMetaData visit(NoFilter filter) throws IOException {
-        return readFileMetaData(from);
+        return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
       }
 
       @Override
       public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
-        return readFileMetaData(from, true);
+        return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
       }
 
       @Override
       public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
-        return filterFileMetaDataByStart(readFileMetaData(from), filter);
+        return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
       }
 
       @Override
       public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
-        return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
+        return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
       }
     });
     LOG.debug("{}", fileMetaData);
-    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
+    
+    if (!encryptedFooter && null != fileDecryptor) {
+      if (!fileMetaData.isSetEncryption_algorithm()) { // Plaintext file
+        fileDecryptor.setPlaintextFile();
+        // Done to detect files that were not encrypted by mistake
+        if (!fileDecryptor.plaintextFilesAllowed()) {
+          throw new ParquetCryptoRuntimeException("Applying decryptor on plaintext file");
+        }
+      } else {  // Encrypted file with plaintext footer
+        // if no fileDecryptor, can still read plaintext columns
+        fileDecryptor.setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), false, 
+            fileMetaData.getFooter_signing_key_metadata());
+        if (fileDecryptor.checkFooterIntegrity()) {
+          verifyFooterIntegrity(from, fileDecryptor, combinedFooterLength);
+        }
+      }
+    }
+    
+    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter);
     if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
     return parquetMetadata;
   }
+  
+  public ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) {
+    return ColumnChunkMetaData.get(
+        columnPath,
+        type,
+        fromFormatCodec(metaData.codec),
+        convertEncodingStats(metaData.getEncoding_stats()),
+        fromFormatEncodings(metaData.encodings),
+        fromParquetStatistics(
+            createdBy,
+            metaData.statistics,
+            type),
+        metaData.data_page_offset,
+        metaData.dictionary_page_offset,
+        metaData.num_values,
+        metaData.total_compressed_size,
+        metaData.total_uncompressed_size);
+  }
 
   public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
+    return fromParquetMetadata(parquetMetadata, null, false);
+  }
+
+  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, 
+      InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
+    
     if (row_groups != null) {
       for (RowGroup rowGroup : row_groups) {
         BlockMetaData blockMetaData = new BlockMetaData();
         blockMetaData.setRowCount(rowGroup.getNum_rows());
         blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        // not set in legacy files
+        if (rowGroup.isSetOrdinal()) {
+          blockMetaData.setOrdinal(rowGroup.getOrdinal());
+        }
         List<ColumnChunk> columns = rowGroup.getColumns();
         String filePath = columns.get(0).getFile_path();
+        int columnOrdinal = -1;
         for (ColumnChunk columnChunk : columns) {
+          columnOrdinal++;
           if ((filePath == null && columnChunk.getFile_path() != null)
               || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
             throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
           }
           ColumnMetaData metaData = columnChunk.meta_data;
-          ColumnPath path = getPath(metaData);
-          ColumnChunkMetaData column = ColumnChunkMetaData.get(
-              path,
-              messageType.getType(path.toArray()).asPrimitiveType(),
-              fromFormatCodec(metaData.codec),
-              convertEncodingStats(metaData.getEncoding_stats()),
-              fromFormatEncodings(metaData.encodings),
-              fromParquetStatistics(
-                  parquetMetadata.getCreated_by(),
-                  metaData.statistics,
-                  messageType.getType(path.toArray()).asPrimitiveType()),
-              metaData.data_page_offset,
-              metaData.dictionary_page_offset,
-              metaData.num_values,
-              metaData.total_compressed_size,
-              metaData.total_uncompressed_size);
+          ColumnCryptoMetaData cryptoMetaData = columnChunk.getCrypto_metadata();
+          ColumnChunkMetaData column = null;
+          ColumnPath columnPath = null;
+          boolean encryptedMetadata = false;
+          
+          if (null == cryptoMetaData) { // Plaintext column
+            columnPath = getPath(metaData);
+            if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+              // mark this column as plaintext in encrypted file decryptor
+              fileDecryptor.setColumnCryptoMetadata(columnPath, false, false, (byte[]) null, columnOrdinal);
+            }
+          } else {  // Encrypted column
+            boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
+            if (encryptedWithFooterKey) { // Column encrypted with footer key
+              if (!encryptedFooter) {
+                throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer");
+              }
+              if (null == metaData) {
+                throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key");
+              }
+              if (null == fileDecryptor) {
+                throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available");
+              }
+              columnPath = getPath(metaData);
+              fileDecryptor.setColumnCryptoMetadata(columnPath, true, true, (byte[]) null, columnOrdinal);
+            }  else { // Column encrypted with column key
+              // setColumnCryptoMetadata triggers KMS interaction, hence delayed until this column is projected
+              encryptedMetadata = true;
+            }
+          }
+          
+          String createdBy = parquetMetadata.getCreated_by();
+          if (!encryptedMetadata) { // unencrypted column, or encrypted with footer key
+            column = buildColumnChunkMetaData(metaData, columnPath, 
+                messageType.getType(columnPath.toArray()).asPrimitiveType(), createdBy);
+            column.setRowGroupOrdinal(rowGroup.getOrdinal());
+            column.setBloomFilterOffset(metaData.bloom_filter_offset);
+          }  else { // column encrypted with column key
+            // Metadata will be decrypted later, if this column is accessed
+            EncryptionWithColumnKey columnKeyStruct = cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();
+            List<String> pathList = columnKeyStruct.getPath_in_schema();
+            byte[] columnKeyMetadata = columnKeyStruct.getKey_metadata();
+            columnPath = ColumnPath.get(pathList.toArray(new String[pathList.size()]));
+            byte[] encryptedMetadataBuffer = columnChunk.getEncrypted_column_metadata();
+            column = ColumnChunkMetaData.getWithEncryptedMetadata(this, columnPath, 
+                messageType.getType(columnPath.toArray()).asPrimitiveType(), encryptedMetadataBuffer, 
+                columnKeyMetadata, fileDecryptor, rowGroup.getOrdinal(), columnOrdinal, createdBy);
+          }
+          
           column.setColumnIndexReference(toColumnIndexReference(columnChunk));
           column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
-          column.setBloomFilterOffset(metaData.bloom_filter_offset);
+          
           // TODO
           // index_page_offset
           // key_value_metadata
@@ -1266,7 +1473,7 @@ public class ParquetMetadataConverter {
       }
     }
     return new ParquetMetadata(
-        new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by()),
+        new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by(), fileDecryptor),
         blocks);
   }
 
@@ -1467,19 +1674,34 @@ public class ParquetMetadataConverter {
   }
 
   public void writeDataPageV1Header(
-    int uncompressedSize,
-    int compressedSize,
-    int valueCount,
-    org.apache.parquet.column.Encoding rlEncoding,
-    org.apache.parquet.column.Encoding dlEncoding,
-    org.apache.parquet.column.Encoding valuesEncoding,
-    OutputStream to) throws IOException {
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding,
+      OutputStream to) throws IOException {
+    writeDataPageV1Header(uncompressedSize, compressedSize, valueCount,
+        rlEncoding, dlEncoding, valuesEncoding, to, null, null);
+  }
+
+  public void writeDataPageV1Header(
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding,
+      OutputStream to,
+      BlockCipher.Encryptor blockEncryptor,
+      byte[] AAD) throws IOException {
     writePageHeader(newDataPageHeader(uncompressedSize,
-      compressedSize,
-      valueCount,
-      rlEncoding,
-      dlEncoding,
-      valuesEncoding), to);
+                                      compressedSize,
+                                      valueCount,
+                                      rlEncoding,
+                                      dlEncoding,
+                                      valuesEncoding), 
+                    to, blockEncryptor, AAD);
   }
 
   public void writeDataPageV1Header(
@@ -1491,13 +1713,29 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding valuesEncoding,
       int crc,
       OutputStream to) throws IOException {
+    writeDataPageV1Header(uncompressedSize, compressedSize, valueCount,
+        rlEncoding, dlEncoding, valuesEncoding, crc, to, null, null);
+  }
+
+  public void writeDataPageV1Header(
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding,
+      int crc,
+      OutputStream to,
+      BlockCipher.Encryptor blockEncryptor,
+      byte[] AAD) throws IOException {
     writePageHeader(newDataPageHeader(uncompressedSize,
                                       compressedSize,
                                       valueCount,
                                       rlEncoding,
                                       dlEncoding,
                                       valuesEncoding,
-                                      crc), to);
+                                      crc), 
+                    to, blockEncryptor, AAD);
   }
 
   public void writeDataPageV2Header(
@@ -1506,12 +1744,24 @@ public class ParquetMetadataConverter {
       org.apache.parquet.column.Encoding dataEncoding,
       int rlByteLength, int dlByteLength,
       OutputStream to) throws IOException {
+    writeDataPageV2Header(uncompressedSize, compressedSize,
+        valueCount, nullCount, rowCount, dataEncoding,
+        rlByteLength, dlByteLength, to, null, null);
+  }
+
+  public void writeDataPageV2Header(
+      int uncompressedSize, int compressedSize,
+      int valueCount, int nullCount, int rowCount,
+      org.apache.parquet.column.Encoding dataEncoding,
+      int rlByteLength, int dlByteLength,
+      OutputStream to, BlockCipher.Encryptor blockEncryptor,
+      byte[] AAD) throws IOException {
     writePageHeader(
         newDataPageV2Header(
             uncompressedSize, compressedSize,
             valueCount, nullCount, rowCount,
             dataEncoding,
-            rlByteLength, dlByteLength), to);
+            rlByteLength, dlByteLength), to, blockEncryptor, AAD);
   }
 
   private PageHeader newDataPageV2Header(
@@ -1530,20 +1780,36 @@ public class ParquetMetadataConverter {
   }
 
   public void writeDictionaryPageHeader(
-    int uncompressedSize, int compressedSize, int valueCount,
-    org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+      int uncompressedSize, int compressedSize, int valueCount,
+      org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+    writeDictionaryPageHeader(uncompressedSize, compressedSize, valueCount,
+        valuesEncoding, to, null, null);
+  }
+
+  public void writeDictionaryPageHeader(
+      int uncompressedSize, int compressedSize, int valueCount,
+      org.apache.parquet.column.Encoding valuesEncoding, OutputStream to,
+      BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
     PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
     pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
-    writePageHeader(pageHeader, to);
+    writePageHeader(pageHeader, to, blockEncryptor, AAD);
   }
 
   public void writeDictionaryPageHeader(
       int uncompressedSize, int compressedSize, int valueCount,
       org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to) throws IOException {
+    writeDictionaryPageHeader(uncompressedSize, compressedSize, valueCount,
+        valuesEncoding, crc, to, null, null);
+  }
+
+  public void writeDictionaryPageHeader(
+      int uncompressedSize, int compressedSize, int valueCount,
+      org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to,
+      BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
     PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
     pageHeader.setCrc(crc);
     pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
-    writePageHeader(pageHeader, to);
+    writePageHeader(pageHeader, to, blockEncryptor, AAD);
   }
 
   private static BoundaryOrder toParquetBoundaryOrder(
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 6f21fa3..3d1bafe 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -37,6 +37,9 @@ import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.internal.filter2.columnindex.RowRanges;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -69,9 +72,15 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
     private final OffsetIndex offsetIndex;
     private final long rowCount;
     private int pageIndex = 0;
+    
+    private final BlockCipher.Decryptor blockDecryptor;
+    private final byte[] dataPageAAD;
+    private final byte[] dictionaryPageAAD;
 
     ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages,
-        DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount) {
+        DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount,
+        BlockCipher.Decryptor blockDecryptor, byte[] fileAAD, 
+        int rowGroupOrdinal, int columnOrdinal) {
       this.decompressor = decompressor;
       this.compressedPages = new ArrayDeque<DataPage>(compressedPages);
       this.compressedDictionaryPage = compressedDictionaryPage;
@@ -82,6 +91,24 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
       this.valueCount = count;
       this.offsetIndex = offsetIndex;
       this.rowCount = rowCount;
+      
+      this.blockDecryptor = blockDecryptor;
+ 
+      if (null != blockDecryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0);
+        dictionaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage, rowGroupOrdinal, columnOrdinal, -1);
+      } else {
+        dataPageAAD = null;
+        dictionaryPageAAD = null;
+      }
+    }
+    
+    private int getPageOrdinal(int currentPageIndex) {
+      if (null == offsetIndex) {
+        return currentPageIndex;
+      }
+      
+      return offsetIndex.getPageOrdinal(currentPageIndex);
     }
 
     @Override
@@ -96,11 +123,21 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
         return null;
       }
       final int currentPageIndex = pageIndex++;
+      
+      if (null != blockDecryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageAAD, getPageOrdinal(currentPageIndex));
+      }
+      
       return compressedPage.accept(new DataPage.Visitor<DataPage>() {
         @Override
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
-            BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize());
+            BytesInput bytes = dataPageV1.getBytes();
+            if (null != blockDecryptor) {
+              bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+            }
+            BytesInput decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
+            
             final DataPageV1 decompressedPage;
             if (offsetIndex == null) {
               decompressedPage = new DataPageV1(
@@ -135,54 +172,53 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
 
         @Override
         public DataPage visit(DataPageV2 dataPageV2) {
-          if (!dataPageV2.isCompressed()) {
-            if (offsetIndex == null) {
-              return dataPageV2;
-            } else {
-              return DataPageV2.uncompressed(
-                  dataPageV2.getRowCount(),
-                  dataPageV2.getNullCount(),
-                  dataPageV2.getValueCount(),
-                  offsetIndex.getFirstRowIndex(currentPageIndex),
-                  dataPageV2.getRepetitionLevels(),
-                  dataPageV2.getDefinitionLevels(),
-                  dataPageV2.getDataEncoding(),
-                  dataPageV2.getData(),
-                  dataPageV2.getStatistics());
+          if (!dataPageV2.isCompressed() &&  offsetIndex == null && null == blockDecryptor) {
+            return dataPageV2;
+          }
+          BytesInput pageBytes = dataPageV2.getData();
+          
+          if (null != blockDecryptor) {
+            try {
+              pageBytes = BytesInput.from(blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
+            } catch (IOException e) {
+              throw new ParquetDecodingException("could not convert page ByteInput to byte array", e);
             }
           }
-          try {
+          if (dataPageV2.isCompressed()) {
             int uncompressedSize = Math.toIntExact(
                 dataPageV2.getUncompressedSize()
                     - dataPageV2.getDefinitionLevels().size()
                     - dataPageV2.getRepetitionLevels().size());
-            BytesInput decompressed = decompressor.decompress(dataPageV2.getData(), uncompressedSize);
-            if (offsetIndex == null) {
-              return DataPageV2.uncompressed(
-                  dataPageV2.getRowCount(),
-                  dataPageV2.getNullCount(),
-                  dataPageV2.getValueCount(),
-                  dataPageV2.getRepetitionLevels(),
-                  dataPageV2.getDefinitionLevels(),
-                  dataPageV2.getDataEncoding(),
-                  decompressed,
-                  dataPageV2.getStatistics());
-            } else {
-              return DataPageV2.uncompressed(
-                  dataPageV2.getRowCount(),
-                  dataPageV2.getNullCount(),
-                  dataPageV2.getValueCount(),
-                  offsetIndex.getFirstRowIndex(currentPageIndex),
-                  dataPageV2.getRepetitionLevels(),
-                  dataPageV2.getDefinitionLevels(),
-                  dataPageV2.getDataEncoding(),
-                  decompressed,
-                  dataPageV2.getStatistics());
+            try {
+              pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
+            } catch (IOException e) {
+              throw new ParquetDecodingException("could not decompress page", e);
             }
-          } catch (IOException e) {
-            throw new ParquetDecodingException("could not decompress page", e);
           }
-        }
+          
+          if (offsetIndex == null) {
+            return DataPageV2.uncompressed(
+                dataPageV2.getRowCount(),
+                dataPageV2.getNullCount(),
+                dataPageV2.getValueCount(),
+                dataPageV2.getRepetitionLevels(),
+                dataPageV2.getDefinitionLevels(),
+                dataPageV2.getDataEncoding(),
+                pageBytes,
+                dataPageV2.getStatistics());
+          } else {
+            return DataPageV2.uncompressed(
+                dataPageV2.getRowCount(),
+                dataPageV2.getNullCount(),
+                dataPageV2.getValueCount(),
+                offsetIndex.getFirstRowIndex(currentPageIndex),
+                dataPageV2.getRepetitionLevels(),
+                dataPageV2.getDefinitionLevels(),
+                dataPageV2.getDataEncoding(),
+                pageBytes,
+                dataPageV2.getStatistics());
+          }
+        } 
       });
     }
 
@@ -192,8 +228,12 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
         return null;
       }
       try {
+        BytesInput bytes = compressedDictionaryPage.getBytes();
+        if (null != blockDecryptor) {
+          bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD));
+        }
         DictionaryPage decompressedPage = new DictionaryPage(
-          decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
+          decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()),
           compressedDictionaryPage.getDictionarySize(),
           compressedDictionaryPage.getEncoding());
         if (compressedDictionaryPage.getCrc().isPresent()) {
@@ -249,5 +289,4 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
       throw new RuntimeException(path+ " was added twice");
     }
   }
-
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index d2e4c96..134f3da 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -40,8 +40,14 @@ import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.io.ParquetEncodingException;
@@ -82,12 +88,26 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
 
     private final CRC32 crc;
     boolean pageWriteChecksumEnabled;
+    
+    private final BlockCipher.Encryptor headerBlockEncryptor;
+    private final BlockCipher.Encryptor pageBlockEncryptor;
+    private final int rowGroupOrdinal;
+    private final int columnOrdinal;
+    private int pageOrdinal;
+    private final byte[] dataPageAAD;
+    private final byte[] dataPageHeaderAAD;
+    private final byte[] fileAAD;
 
     private ColumnChunkPageWriter(ColumnDescriptor path,
                                   BytesCompressor compressor,
                                   ByteBufferAllocator allocator,
                                   int columnIndexTruncateLength,
-                                  boolean pageWriteChecksumEnabled) {
+                                  boolean pageWriteChecksumEnabled,
+                                  BlockCipher.Encryptor headerBlockEncryptor,
+                                  BlockCipher.Encryptor pageBlockEncryptor,
+                                  byte[] fileAAD,
+                                  int rowGroupOrdinal,
+                                  int columnOrdinal) {
       this.path = path;
       this.compressor = compressor;
       this.allocator = allocator;
@@ -96,6 +116,25 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
       this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
       this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
       this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+      
+      this.headerBlockEncryptor = headerBlockEncryptor;
+      this.pageBlockEncryptor = pageBlockEncryptor;
+      this.fileAAD = fileAAD;
+      this.rowGroupOrdinal = rowGroupOrdinal;
+      this.columnOrdinal = columnOrdinal;
+      this.pageOrdinal = -1;
+      if (null != headerBlockEncryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader, 
+            rowGroupOrdinal, columnOrdinal, 0);
+      } else {
+        dataPageHeaderAAD = null;
+      }
+      if (null != pageBlockEncryptor) {
+        dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage, 
+            rowGroupOrdinal, columnOrdinal, 0);
+      } else {
+        dataPageAAD = null;
+      }
     }
 
     @Override
@@ -117,6 +156,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
                           Encoding rlEncoding,
                           Encoding dlEncoding,
                           Encoding valuesEncoding) throws IOException {
+      pageOrdinal++;
       long uncompressedSize = bytes.size();
       if (uncompressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
@@ -124,6 +164,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
                 uncompressedSize);
       }
       BytesInput compressedBytes = compressor.compress(bytes);
+      if (null != pageBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+        compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
+      }
       long compressedSize = compressedBytes.size();
       if (compressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
@@ -131,6 +175,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
                 + compressedSize);
       }
       tempOutputStream.reset();
+      if (null != headerBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+      }
       if (pageWriteChecksumEnabled) {
         crc.reset();
         crc.update(compressedBytes.toByteArray());
@@ -142,7 +189,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
           dlEncoding,
           valuesEncoding,
           (int) crc.getValue(),
-          tempOutputStream);
+          tempOutputStream,
+          headerBlockEncryptor,
+          dataPageHeaderAAD);
       } else {
         parquetMetadataConverter.writeDataPageV1Header(
           (int)uncompressedSize,
@@ -151,7 +200,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
           rlEncoding,
           dlEncoding,
           valuesEncoding,
-          tempOutputStream);
+          tempOutputStream,
+          headerBlockEncryptor,
+          dataPageHeaderAAD);
       }
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
@@ -182,6 +233,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
         BytesInput repetitionLevels, BytesInput definitionLevels,
         Encoding dataEncoding, BytesInput data,
         Statistics<?> statistics) throws IOException {
+      pageOrdinal++;
+      
       int rlByteLength = toIntWithCheck(repetitionLevels.size());
       int dlByteLength = toIntWithCheck(definitionLevels.size());
       int uncompressedSize = toIntWithCheck(
@@ -189,17 +242,26 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
       );
       // TODO: decide if we compress
       BytesInput compressedData = compressor.compress(data);
+      if (null != pageBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+        compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
+      }
       int compressedSize = toIntWithCheck(
           compressedData.size() + repetitionLevels.size() + definitionLevels.size()
       );
       tempOutputStream.reset();
+      if (null != headerBlockEncryptor) {
+        AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+      }
       parquetMetadataConverter.writeDataPageV2Header(
           uncompressedSize, compressedSize,
           valueCount, nullCount, rowCount,
           dataEncoding,
           rlByteLength,
           dlByteLength,
-          tempOutputStream);
+          tempOutputStream,
+          headerBlockEncryptor,
+          dataPageHeaderAAD);
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
@@ -242,21 +304,43 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
     }
 
     public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.writeColumnChunk(
-          path,
-          totalValueCount,
-          compressor.getCodecName(),
-          dictionaryPage,
-          buf,
-          uncompressedLength,
-          compressedLength,
-          totalStatistics,
-          columnIndexBuilder,
-          offsetIndexBuilder,
-          bloomFilter,
-          rlEncodings,
-          dlEncodings,
-          dataEncodings);
+      if (null == headerBlockEncryptor) {
+        writer.writeColumnChunk(
+            path,
+            totalValueCount,
+            compressor.getCodecName(),
+            dictionaryPage,
+            buf,
+            uncompressedLength,
+            compressedLength,
+            totalStatistics,
+            columnIndexBuilder,
+            offsetIndexBuilder,
+            bloomFilter,
+            rlEncodings,
+            dlEncodings,
+            dataEncodings);
+      } else {
+        writer.writeColumnChunk(
+            path,
+            totalValueCount,
+            compressor.getCodecName(),
+            dictionaryPage,
+            buf,
+            uncompressedLength,
+            compressedLength,
+            totalStatistics,
+            columnIndexBuilder,
+            offsetIndexBuilder,
+            bloomFilter,
+            rlEncodings,
+            dlEncodings,
+            dataEncodings,
+            headerBlockEncryptor,
+            rowGroupOrdinal,
+            columnOrdinal, 
+            fileAAD);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug(
             String.format(
@@ -271,6 +355,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
       dlEncodings.clear();
       dataEncodings.clear();
       pageCount = 0;
+      pageOrdinal = -1;
     }
 
     @Override
@@ -286,7 +371,13 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
       BytesInput dictionaryBytes = dictionaryPage.getBytes();
       int uncompressedSize = (int)dictionaryBytes.size();
       BytesInput compressedBytes = compressor.compress(dictionaryBytes);
-      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+      if (null != pageBlockEncryptor) {
+        byte[] dictonaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage, 
+            rowGroupOrdinal, columnOrdinal, -1);
+        compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD));
+      }
+      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, 
+          dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
     }
 
     @Override
@@ -313,7 +404,39 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
       int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled));
+      writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, 
+          pageWriteChecksumEnabled, null, null, null, -1, -1));
+    }
+  }
+  
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+      int columnIndexTruncateLength, boolean pageWriteChecksumEnabled, InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) {
+    this.schema = schema;
+    if (null == fileEncryptor) {
+      for (ColumnDescriptor path : schema.getColumns()) {
+        writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, 
+            pageWriteChecksumEnabled, null, null, null, -1, -1));
+      }
+      return;
+    }
+    
+    // Encrypted file
+    int columnOrdinal = -1;
+    byte[] fileAAD = fileEncryptor.getFileAAD();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      columnOrdinal++;
+      BlockCipher.Encryptor headerBlockEncryptor = null;
+      BlockCipher.Encryptor pageBlockEncryptor = null;
+      ColumnPath columnPath = ColumnPath.get(path.getPath());
+      
+      InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal);
+      if (columnSetup.isEncrypted()) {
+        headerBlockEncryptor = columnSetup.getMetaDataEncryptor();
+        pageBlockEncryptor = columnSetup.getDataEncryptor();
+      }
+
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled,
+          headerBlockEncryptor, pageBlockEncryptor, fileAAD, rowGroupOrdinal, columnOrdinal));
     }
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
index 448515e..fbec3b2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
@@ -69,6 +69,11 @@ class ColumnIndexFilterUtils {
       this.offsetIndex = offsetIndex;
       this.indexMap = indexMap;
     }
+    
+    @Override
+    public int getPageOrdinal(int pageIndex) {
+      return indexMap[pageIndex];
+    }
 
     @Override
     public int getPageCount() {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 99643ed..d90fda1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -20,8 +20,6 @@ package org.apache.parquet.hadoop;
 
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -32,12 +30,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
-import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
-
 /**
  * A {@link DictionaryPageReadStore} implementation that reads dictionaries from
  * an open {@link ParquetFileReader}.
@@ -101,7 +95,7 @@ class DictionaryPageReader implements DictionaryPageReadStore {
     return dictionaryPageCache.computeIfAbsent(dotPath, key -> {
       try {
         final DictionaryPage dict =
-            hasDictionaryPage(column) ? reader.readDictionary(column) : null;
+            column.hasDictionaryPage() ? reader.readDictionary(column) : null;
 
         // Copy the dictionary to ensure it can be reused if it is returned
         // more than once. This can happen when a DictionaryFilter has two or
@@ -118,15 +112,4 @@ class DictionaryPageReader implements DictionaryPageReadStore {
     return new DictionaryPage(BytesInput.from(dict.getBytes().toByteArray()),
         dict.getDictionarySize(), dict.getEncoding());
   }
-
-  private boolean hasDictionaryPage(ColumnChunkMetaData column) {
-    EncodingStats stats = column.getEncodingStats();
-    if (stats != null) {
-      // ensure there is a dictionary page and that it is used to encode data pages
-      return stats.hasDictionaryPages() && stats.hasDictionaryEncodedPages();
-    }
-
-    Set<Encoding> encodings = column.getEncodings();
-    return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
-  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 205509c..0ecdabf 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.crypto.InternalFileEncryptor;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -67,6 +68,9 @@ class InternalParquetRecordWriter<T> {
   private ColumnChunkPageWriteStore pageStore;
   private BloomFilterWriteStore bloomFilterWriteStore;
   private RecordConsumer recordConsumer;
+  
+  private InternalFileEncryptor fileEncryptor;
+  private int rowGroupOrdinal;
 
   /**
    * @param parquetFileWriter the file to write to
@@ -95,6 +99,8 @@ class InternalParquetRecordWriter<T> {
     this.compressor = compressor;
     this.validating = validating;
     this.props = props;
+    this.fileEncryptor = parquetFileWriter.getEncryptor();
+    this.rowGroupOrdinal = 0;
     initStore();
   }
 
@@ -104,7 +110,8 @@ class InternalParquetRecordWriter<T> {
 
   private void initStore() {
     ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,
-        schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
+        schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(),
+        fileEncryptor, rowGroupOrdinal);
     pageStore = columnChunkPageWriteStore;
     bloomFilterWriteStore = columnChunkPageWriteStore;
 
@@ -173,6 +180,7 @@ class InternalParquetRecordWriter<T> {
     }
 
     if (recordCount > 0) {
+      rowGroupOrdinal++;
       parquetFileWriter.startBlock(recordCount);
       columnStore.flush();
       pageStore.flushToFileWriter(parquetFileWriter);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 18fbf6d..0590638 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -22,17 +22,19 @@ import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS;
+import static org.apache.parquet.format.Util.readFileCryptoMetaData;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
 import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.calculateOffsetRanges;
 import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.filterOffsetIndex;
+import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC;
 import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
 
 import java.io.Closeable;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.SequenceInputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -63,7 +65,6 @@ import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
@@ -73,12 +74,20 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.BloomFilterHeader;
 import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.FileCryptoMetaData;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -501,7 +510,8 @@ public class ParquetFileReader implements Closeable {
   public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
     ParquetReadOptions options;
     if (file instanceof HadoopInputFile) {
-      options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
+      HadoopInputFile hadoopFile = (HadoopInputFile) file;
+      options = HadoopReadOptions.builder(hadoopFile.getConfiguration(), hadoopFile.getPath())
           .withMetadataFilter(filter).build();
     } else {
       options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
@@ -517,37 +527,71 @@ public class ParquetFileReader implements Closeable {
     return readFooter(file, options, f, converter);
   }
 
-  private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
+  private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, 
+      SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
+
     long fileLen = file.getLength();
+    String filePath = file.toString();
     LOG.debug("File length {}", fileLen);
+
     int FOOTER_LENGTH_SIZE = 4;
     if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
-      throw new RuntimeException(file.toString() + " is not a Parquet file (too small length: " + fileLen + ")");
+      throw new RuntimeException(filePath + " is not a Parquet file (length is too low: " + fileLen + ")");
     }
-    long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
-    LOG.debug("reading footer index at {}", footerLengthIndex);
 
-    f.seek(footerLengthIndex);
-    int footerLength = readIntLittleEndian(f);
+    // Read footer length and magic string - with a single seek
     byte[] magic = new byte[MAGIC.length];
+    long fileMetadataLengthIndex = fileLen - magic.length - FOOTER_LENGTH_SIZE;
+    LOG.debug("reading footer index at {}", fileMetadataLengthIndex);
+    f.seek(fileMetadataLengthIndex);
+    int fileMetadataLength = readIntLittleEndian(f);
     f.readFully(magic);
-    if (!Arrays.equals(MAGIC, magic)) {
-      throw new RuntimeException(file.toString() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
+
+    boolean encryptedFooterMode;
+    if (Arrays.equals(MAGIC, magic)) {
+      encryptedFooterMode = false;
+    } else if (Arrays.equals(EFMAGIC, magic)) {
+      encryptedFooterMode = true;
+    } else {
+      throw new RuntimeException(filePath + " is not a Parquet file. Expected magic number at tail, but found " + Arrays.toString(magic));
+    }
+
+    long fileMetadataIndex = fileMetadataLengthIndex - fileMetadataLength;
+    LOG.debug("read footer length: {}, footer index: {}", fileMetadataLength, fileMetadataIndex);
+    if (fileMetadataIndex < magic.length || fileMetadataIndex >= fileMetadataLengthIndex) {
+      throw new RuntimeException("corrupted file: the footer index is not within the file: " + fileMetadataIndex);
     }
-    long footerIndex = footerLengthIndex - footerLength;
-    LOG.debug("read footer length: {}, footer index: {}", footerLength, footerIndex);
-    if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
-      throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex);
+    f.seek(fileMetadataIndex);
+
+    FileDecryptionProperties fileDecryptionProperties = options.getDecryptionProperties();
+    InternalFileDecryptor fileDecryptor = null;
+    if (null != fileDecryptionProperties) {
+      fileDecryptor  = new InternalFileDecryptor(fileDecryptionProperties);
     }
-    f.seek(footerIndex);
+
     // Read all the footer bytes in one time to avoid multiple read operations,
     // since it can be pretty time consuming for a single read operation in HDFS.
-    ByteBuffer footerBytesBuffer = ByteBuffer.allocate(footerLength);
+    ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength);
     f.readFully(footerBytesBuffer);
     LOG.debug("Finished to read all footer bytes.");
     footerBytesBuffer.flip();
     InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer);
-    return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter());
+
+    // Regular file, or encrypted file with plaintext footer
+    if (!encryptedFooterMode) {
+      return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, false, 
+          fileMetadataLength);
+    }
+
+    // Encrypted file with encrypted footer
+    if (null == fileDecryptor) {
+      throw new ParquetCryptoRuntimeException("Trying to read file with encrypted footer. No keys available");
+    }
+    FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream);
+    fileDecryptor.setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(), 
+        true, fileCryptoMetaData.getKey_metadata());
+    // footer length is required only for signed plaintext footers
+    return  converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0); 
   }
 
   /**
@@ -560,7 +604,7 @@ public class ParquetFileReader implements Closeable {
   @Deprecated
   public static ParquetFileReader open(Configuration conf, Path file) throws IOException {
     return new ParquetFileReader(HadoopInputFile.fromPath(file, conf),
-        HadoopReadOptions.builder(conf).build());
+        HadoopReadOptions.builder(conf, file).build());
   }
 
   /**
@@ -574,7 +618,7 @@ public class ParquetFileReader implements Closeable {
   @Deprecated
   public static ParquetFileReader open(Configuration conf, Path file, MetadataFilter filter) throws IOException {
     return open(HadoopInputFile.fromPath(file, conf),
-        HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
+        HadoopReadOptions.builder(conf, file).withMetadataFilter(filter).build());
   }
 
   /**
@@ -613,6 +657,7 @@ public class ParquetFileReader implements Closeable {
     return new ParquetFileReader(file, options);
   }
 
+
   private final InputFile file;
   private final SeekableInputStream f;
   private final ParquetReadOptions options;
@@ -629,6 +674,8 @@ public class ParquetFileReader implements Closeable {
   private ColumnChunkPageReadStore currentRowGroup = null;
   private DictionaryPageReader nextDictionaryReader = null;
 
+  private InternalFileDecryptor fileDecryptor = null;
+
   /**
    * @param configuration the Hadoop conf
    * @param filePath Path for the parquet file
@@ -660,7 +707,14 @@ public class ParquetFileReader implements Closeable {
     this.file = HadoopInputFile.fromPath(filePath, configuration);
     this.fileMetaData = fileMetaData;
     this.f = file.newStream();
-    this.options = HadoopReadOptions.builder(configuration).build();
+    this.fileDecryptor = fileMetaData.getFileDecryptor();
+    if (null == fileDecryptor) {
+      this.options = HadoopReadOptions.builder(configuration).build();
+    } else {
+      this.options = HadoopReadOptions.builder(configuration)
+                                      .withDecryption(fileDecryptor.getDecryptionProperties())
+                                      .build();
+    }
     this.blocks = filterRowGroups(blocks);
     this.blockIndexStores = listWithNulls(this.blocks.size());
     this.blockRowRanges = listWithNulls(this.blocks.size());
@@ -680,7 +734,7 @@ public class ParquetFileReader implements Closeable {
   @Deprecated
   public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException {
     this(HadoopInputFile.fromPath(file, conf),
-        HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
+        HadoopReadOptions.builder(conf, file).withMetadataFilter(filter).build());
   }
 
   /**
@@ -695,9 +749,16 @@ public class ParquetFileReader implements Closeable {
     this.converter = new ParquetMetadataConverter(conf);
     this.file = HadoopInputFile.fromPath(file, conf);
     this.f = this.file.newStream();
-    this.options = HadoopReadOptions.builder(conf).build();
-    this.footer = footer;
     this.fileMetaData = footer.getFileMetaData();
+    this.fileDecryptor = fileMetaData.getFileDecryptor();
+    if (null == fileDecryptor) {
+      this.options = HadoopReadOptions.builder(conf).build();
+    } else {
+      this.options = HadoopReadOptions.builder(conf)
+                                      .withDecryption(fileDecryptor.getDecryptionProperties())
+                                      .build();
+    }
+    this.footer = footer;
     this.blocks = filterRowGroups(footer.getBlocks());
     this.blockIndexStores = listWithNulls(this.blocks.size());
     this.blockRowRanges = listWithNulls(this.blocks.size());
@@ -721,6 +782,11 @@ public class ParquetFileReader implements Closeable {
       throw e;
     }
     this.fileMetaData = footer.getFileMetaData();
+    this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
+    if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
+      this.fileDecryptor = null; // Plaintext file. No need in decryptor
+    }
+
     this.blocks = filterRowGroups(footer.getBlocks());
     this.blockIndexStores = listWithNulls(this.blocks.size());
     this.blockRowRanges = listWithNulls(this.blocks.size());
@@ -843,9 +909,9 @@ public class ParquetFileReader implements Closeable {
     ConsecutivePartList currentParts = null;
     for (ColumnChunkMetaData mc : block.getColumns()) {
       ColumnPath pathKey = mc.getPath();
-      BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
       ColumnDescriptor columnDescriptor = paths.get(pathKey);
       if (columnDescriptor != null) {
+        BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
         long startingPos = mc.getStartingPos();
         // first part or not consecutive => new list
         if (currentParts == null || currentParts.endPos() != startingPos) {
@@ -861,7 +927,7 @@ public class ParquetFileReader implements Closeable {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+      readChunkPages(chunk, block);
     }
 
     // avoid re-reading bytes the dictionary reader is used after this call
@@ -941,7 +1007,7 @@ public class ParquetFileReader implements Closeable {
       consecutiveChunks.readAll(f, builder);
     }
     for (Chunk chunk : builder.build()) {
-      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+      readChunkPages(chunk, block);
     }
 
     // avoid re-reading bytes the dictionary reader is used after this call
@@ -954,6 +1020,23 @@ public class ParquetFileReader implements Closeable {
     return currentRowGroup;
   }
 
+  private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException {
+    if (null == fileDecryptor || fileDecryptor.plaintextFile()) {
+      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+      return;
+    }
+    // Encrypted file
+    ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath());
+    InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath);
+    if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
+      currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+    }  else { // encrypted column
+      currentRowGroup.addColumn(chunk.descriptor.col, 
+          chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(), 
+              fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal()));
+    }
+  }
+
   private ColumnIndexStore getColumnIndexStore(int blockIndex) {
     ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
     if (ciStore == null) {
@@ -1017,8 +1100,7 @@ public class ParquetFileReader implements Closeable {
    * @throws IOException if there is an error while reading the dictionary
    */
   DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException {
-    if (!meta.getEncodings().contains(Encoding.PLAIN_DICTIONARY) &&
-        !meta.getEncodings().contains(Encoding.RLE_DICTIONARY)) {
+    if (!meta.hasDictionaryPage()) {
       return null;
     }
 
@@ -1027,12 +1109,34 @@ public class ParquetFileReader implements Closeable {
       f.seek(meta.getStartingPos());
     }
 
-    PageHeader pageHeader = Util.readPageHeader(f);
+    boolean encryptedColumn = false;
+    InternalColumnDecryptionSetup columnDecryptionSetup = null;
+    byte[] dictionaryPageAAD = null;
+    BlockCipher.Decryptor pageDecryptor = null;
+    if (null != fileDecryptor  && !fileDecryptor.plaintextFile()) {
+      columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
+      if (columnDecryptionSetup.isEncrypted()) {
+        encryptedColumn = true;
+      }
+    }
+
+    PageHeader pageHeader;
+    if (!encryptedColumn) {
+      pageHeader = Util.readPageHeader(f);
+    } else {
+      byte[] dictionaryPageHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPageHeader, 
+          meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+      pageHeader = Util.readPageHeader(f, columnDecryptionSetup.getMetaDataDecryptor(), dictionaryPageHeaderAAD);
+      dictionaryPageAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPage, 
+          meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+      pageDecryptor = columnDecryptionSetup.getDataDecryptor();
+    }
+
     if (!pageHeader.isSetDictionary_page_header()) {
       return null; // TODO: should this complain?
     }
 
-    DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f);
+    DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f, pageDecryptor, dictionaryPageAAD);
     BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(meta.getCodec());
 
     return new DictionaryPage(
@@ -1042,7 +1146,8 @@ public class ParquetFileReader implements Closeable {
   }
 
   private DictionaryPage readCompressedDictionary(
-      PageHeader pageHeader, SeekableInputStream fin) throws IOException {
+      PageHeader pageHeader, SeekableInputStream fin,
+      BlockCipher.Decryptor pageDecryptor, byte[] dictionaryPageAAD) throws IOException {
     DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
 
     int uncompressedPageSize = pageHeader.getUncompressed_page_size();
@@ -1053,6 +1158,10 @@ public class ParquetFileReader implements Closeable {
 
     BytesInput bin = BytesInput.from(dictPageBytes);
 
+    if (null != pageDecryptor) {
+      bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(), dictionaryPageAAD));
+    }
+
     return new DictionaryPage(
         bin, uncompressedPageSize, dictHeader.getNum_values(),
         converter.getEncoding(dictHeader.getEncoding()));
@@ -1071,12 +1180,30 @@ public class ParquetFileReader implements Closeable {
    */
   public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
     long bloomFilterOffset = meta.getBloomFilterOffset();
-    f.seek(bloomFilterOffset);
-    BloomFilterHeader bloomFilterHeader;
+    if (0 == bloomFilterOffset) {
+      return null;
+    }
+
+    // Prepare to decrypt Bloom filter (for encrypted columns)
+    BlockCipher.Decryptor bloomFilterDecryptor = null;
+    byte[] bloomFilterHeaderAAD = null;
+    byte[] bloomFilterBitsetAAD = null;
+    if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+      InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
+      if (columnDecryptionSetup.isEncrypted()) {
+        bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
+        bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterHeader, 
+            meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+        bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterBitset, 
+            meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+      }
+    }
 
     // Read Bloom filter data header.
+    f.seek(bloomFilterOffset);
+    BloomFilterHeader bloomFilterHeader;
     try {
-      bloomFilterHeader = Util.readBloomFilterHeader(f);
+      bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
     } catch (IOException e) {
       LOG.warn("read no bloom filter");
       return null;
@@ -1095,8 +1222,16 @@ public class ParquetFileReader implements Closeable {
       return null;
     }
 
-    byte[] bitset = new byte[numBytes];
-    f.readFully(bitset);
+    byte[] bitset;
+    if (null == bloomFilterDecryptor) {
+      bitset = new byte[numBytes];
+      f.readFully(bitset);
+    } else {
+      bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+      if (bitset.length != numBytes) {
+        throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
+      }
+    }
     return new BlockSplitBloomFilter(bitset);
   }
 
@@ -1114,7 +1249,19 @@ public class ParquetFileReader implements Closeable {
       return null;
     }
     f.seek(ref.getOffset());
-    return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), Util.readColumnIndex(f));
+
+    BlockCipher.Decryptor columnIndexDecryptor = null;
+    byte[] columnIndexAAD = null;
+    if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+      InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
+      if (columnDecryptionSetup.isEncrypted()) {
+        columnIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
+        columnIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnIndex, 
+            column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+      }
+    }
+    return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), 
+        Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD));
   }
 
   /**
@@ -1131,7 +1278,18 @@ public class ParquetFileReader implements Closeable {
       return null;
     }
     f.seek(ref.getOffset());
-    return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+
+    BlockCipher.Decryptor offsetIndexDecryptor = null;
+    byte[] offsetIndexAAD = null;
+    if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+      InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
+      if (columnDecryptionSetup.isEncrypted()) {
+        offsetIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
+        offsetIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.OffsetIndex, 
+            column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+      }
+    }
+    return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f, offsetIndexDecryptor, offsetIndexAAD));
   }
 
   @Override
@@ -1217,7 +1375,11 @@ public class ParquetFileReader implements Closeable {
     }
 
     protected PageHeader readPageHeader() throws IOException {
-      return Util.readPageHeader(stream);
+      return readPageHeader(null, null);
+    }
+
+    protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) throws IOException {
+      return Util.readPageHeader(stream, blockDecryptor, pageHeaderAAD);
     }
 
     /**
@@ -1237,14 +1399,34 @@ public class ParquetFileReader implements Closeable {
      * @return the list of pages
      */
     public ColumnChunkPageReader readAllPages() throws IOException {
+      return readAllPages(null, null, null, -1, -1);
+    }
+
+    public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor, 
+        byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) throws IOException {
       List<DataPage> pagesInChunk = new ArrayList<DataPage>();
       DictionaryPage dictionaryPage = null;
       PrimitiveType type = getFileMetaData().getSchema()
           .getType(descriptor.col.getPath()).asPrimitiveType();
       long valuesCountReadSoFar = 0;
       int dataPageCountReadSoFar = 0;
+      byte[] dataPageHeaderAAD = null;
+      if (null != headerBlockDecryptor) {
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal, 
+            columnOrdinal, getPageOrdinal(dataPageCountReadSoFar));
+      }
       while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
-        PageHeader pageHeader = readPageHeader();
+        byte[] pageHeaderAAD = dataPageHeaderAAD;
+        if (null != headerBlockDecryptor) {
+          // Important: this verifies file integrity (makes sure dictionary page had not been removed)
+          if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
+            pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
+          }  else {
+            int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
+            AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+          }
+        }
+        PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
         int uncompressedPageSize = pageHeader.getUncompressed_page_size();
         int compressedPageSize = pageHeader.getCompressed_page_size();
         final BytesInput pageBytes;
@@ -1336,7 +1518,7 @@ public class ParquetFileReader implements Closeable {
       }
       BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
       return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
-          blocks.get(currentBlock).getRowCount());
+          blocks.get(currentBlock).getRowCount(), pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal);
     }
 
     private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
@@ -1344,6 +1526,14 @@ public class ParquetFileReader implements Closeable {
           : dataPageCountReadSoFar < offsetIndex.getPageCount();
     }
 
+    private int getPageOrdinal(int dataPageCountReadSoFar) {
+      if (null == offsetIndex) {
+        return dataPageCountReadSoFar;
+      }
+
+      return offsetIndex.getPageOrdinal(dataPageCountReadSoFar);
+    }
+
     /**
      * @param size the size of the page
      * @return the page
@@ -1530,7 +1720,5 @@ public class ParquetFileReader implements Closeable {
     public long endPos() {
       return offset + length;
     }
-
   }
-
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c6ee1e9..a1017ca 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -18,14 +18,17 @@
  */
 package org.apache.parquet.hadoop;
 
+import static org.apache.parquet.format.Util.writeFileCryptoMetaData;
 import static org.apache.parquet.format.Util.writeFileMetaData;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
 import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
@@ -51,8 +54,17 @@ import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -90,6 +102,8 @@ public class ParquetFileWriter {
   public static final String PARQUET_METADATA_FILE = "_metadata";
   public static final String MAGIC_STR = "PAR1";
   public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+  public static final String EF_MAGIC_STR = "PARE";
+  public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
   public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
   public static final int CURRENT_VERSION = 1;
 
@@ -113,6 +127,9 @@ public class ParquetFileWriter {
 
   // The Bloom filters
   private final List<Map<String, BloomFilter>> bloomFilters = new ArrayList<>();
+  
+  // The file encryptor
+  private final InternalFileEncryptor fileEncryptor;
 
   // row group data
   private BlockMetaData currentBlock; // appended to by endColumn
@@ -280,8 +297,17 @@ public class ParquetFileWriter {
    * @throws IOException if the file can not be created
    */
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+      long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+      int statisticsTruncateLength, boolean pageWriteChecksumEnabled) 
+          throws IOException{
+    this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength,
+      statisticsTruncateLength, pageWriteChecksumEnabled, null);
+  }
+  
+  public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
                            long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
-                           int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+                           int statisticsTruncateLength, boolean pageWriteChecksumEnabled,
+                           FileEncryptionProperties encryptionProperties)
     throws IOException {
     TypeUtil.checkValidWriteSchema(schema);
 
@@ -307,6 +333,22 @@ public class ParquetFileWriter {
     this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
 
     this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
+    
+    if (null == encryptionProperties) {
+      this.fileEncryptor = null;
+    } else {
+      // Verify that every encrypted column is in file schema
+      Map<ColumnPath, ColumnEncryptionProperties> columnEncryptionProperties = encryptionProperties.getEncryptedColumns();
+      if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key
+        for (Map.Entry<ColumnPath, ColumnEncryptionProperties> entry : columnEncryptionProperties.entrySet()) {
+          String[] path = entry.getKey().toArray();
+          if(!schema.containsPath(path)) {
+            throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema");
+          }
+        }
+      }
+      this.fileEncryptor = new InternalFileEncryptor(encryptionProperties);
+    }
   }
 
   /**
@@ -334,6 +376,7 @@ public class ParquetFileWriter {
     this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
     this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
     this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+    this.fileEncryptor = null;
   }
   /**
    * start the file
@@ -342,7 +385,15 @@ public class ParquetFileWriter {
   public void start() throws IOException {
     state = state.start();
     LOG.debug("{}: start", out.getPos());
-    out.write(MAGIC);
+    byte[] magic = MAGIC;
+    if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+      magic = EFMAGIC;
+    }
+    out.write(magic);
+  }
+  
+  InternalFileEncryptor getEncryptor() {
+    return fileEncryptor;
   }
 
   /**
@@ -400,6 +451,11 @@ public class ParquetFileWriter {
    * @throws IOException if there is an error while writing
    */
   public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+    writeDictionaryPage(dictionaryPage, null, null);
+  }
+  
+  public void writeDictionaryPage(DictionaryPage dictionaryPage, 
+      BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException {
     state = state.write();
     LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
     currentChunkDictionaryPageOffset = out.getPos();
@@ -414,20 +470,24 @@ public class ParquetFileWriter {
         dictionaryPage.getDictionarySize(),
         dictionaryPage.getEncoding(),
         (int) crc.getValue(),
-        out);
+        out,
+        headerBlockEncryptor, 
+        AAD);
     } else {
       metadataConverter.writeDictionaryPageHeader(
         uncompressedSize,
         compressedPageSize,
         dictionaryPage.getDictionarySize(),
         dictionaryPage.getEncoding(),
-        out);
+        out,
+        headerBlockEncryptor, 
+        AAD);
     }
     long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
     this.uncompressedLength += uncompressedSize + headerSize;
     this.compressedLength += compressedPageSize + headerSize;
     LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
-    dictionaryPage.getBytes().writeAllTo(out);
+    dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
     encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
     currentEncodings.add(dictionaryPage.getEncoding());
   }
@@ -690,11 +750,39 @@ public class ParquetFileWriter {
       Set<Encoding> rlEncodings,
       Set<Encoding> dlEncodings,
       List<Encoding> dataEncodings) throws IOException {
+    writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes,
+      uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder,
+      bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null);
+  }
+  
+  void writeColumnChunk(ColumnDescriptor descriptor,
+      long valueCount,
+      CompressionCodecName compressionCodecName,
+      DictionaryPage dictionaryPage,
+      BytesInput bytes,
+      long uncompressedTotalPageSize,
+      long compressedTotalPageSize,
+      Statistics<?> totalStats,
+      ColumnIndexBuilder columnIndexBuilder,
+      OffsetIndexBuilder offsetIndexBuilder,
+      BloomFilter bloomFilter,
+      Set<Encoding> rlEncodings,
+      Set<Encoding> dlEncodings,
+      List<Encoding> dataEncodings,
+      BlockCipher.Encryptor headerBlockEncryptor,
+      int rowGroupOrdinal, 
+      int columnOrdinal,
+      byte[] fileAAD) throws IOException {
     startColumn(descriptor, valueCount, compressionCodecName);
 
     state = state.write();
     if (dictionaryPage != null) {
-      writeDictionaryPage(dictionaryPage);
+      byte[] dictonaryPageHeaderAAD = null;
+      if (null != headerBlockEncryptor) {
+        dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader, 
+            rowGroupOrdinal, columnOrdinal, -1);
+      }
+      writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
     }
 
     if (bloomFilter != null) {
@@ -772,6 +860,7 @@ public class ParquetFileWriter {
     state = state.endBlock();
     LOG.debug("{}: end block", out.getPos());
     currentBlock.setRowCount(currentRecordCount);
+    currentBlock.setOrdinal(blocks.size());
     blocks.add(currentBlock);
     columnIndexes.add(currentColumnIndexes);
     offsetIndexes.add(currentOffsetIndexes);
@@ -958,22 +1047,24 @@ public class ParquetFileWriter {
    */
   public void end(Map<String, String> extraMetaData) throws IOException {
     state = state.end();
-    serializeColumnIndexes(columnIndexes, blocks, out);
-    serializeOffsetIndexes(offsetIndexes, blocks, out);
-    serializeBloomFilters(bloomFilters, blocks, out);
+    serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+    serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+    serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
     LOG.debug("{}: end", out.getPos());
     this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
-    serializeFooter(footer, out);
+    serializeFooter(footer, out, fileEncryptor);
     out.close();
   }
 
   private static void serializeColumnIndexes(
       List<List<ColumnIndex>> columnIndexes,
       List<BlockMetaData> blocks,
-      PositionOutputStream out) throws IOException {
+      PositionOutputStream out,
+      InternalFileEncryptor fileEncryptor) throws IOException {
     LOG.debug("{}: column indexes", out.getPos());
     for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
-      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      BlockMetaData block = blocks.get(bIndex);
+      List<ColumnChunkMetaData> columns = block.getColumns();
       List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
       for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
         ColumnChunkMetaData column = columns.get(cIndex);
@@ -982,8 +1073,18 @@ public class ParquetFileWriter {
         if (columnIndex == null) {
           continue;
         }
+        BlockCipher.Encryptor columnIndexEncryptor = null;
+        byte[] columnIndexAAD = null;
+        if (null != fileEncryptor) {
+          InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+          if (columnEncryptionSetup.isEncrypted()) {
+            columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+            columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex, 
+                block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+          }
+        }
         long offset = out.getPos();
-        Util.writeColumnIndex(columnIndex, out);
+        Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD);
         column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
       }
     }
@@ -999,10 +1100,12 @@ public class ParquetFileWriter {
   private static void serializeOffsetIndexes(
       List<List<OffsetIndex>> offsetIndexes,
       List<BlockMetaData> blocks,
-      PositionOutputStream out) throws IOException {
+      PositionOutputStream out,
+      InternalFileEncryptor fileEncryptor) throws IOException {
     LOG.debug("{}: offset indexes", out.getPos());
     for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
-      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      BlockMetaData block = blocks.get(bIndex);
+      List<ColumnChunkMetaData> columns = block.getColumns();
       List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
       for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
         OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
@@ -1010,8 +1113,18 @@ public class ParquetFileWriter {
           continue;
         }
         ColumnChunkMetaData column = columns.get(cIndex);
+        BlockCipher.Encryptor offsetIndexEncryptor = null;
+        byte[] offsetIndexAAD = null;
+        if (null != fileEncryptor) {
+          InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+          if (columnEncryptionSetup.isEncrypted()) {
+            offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+            offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex, 
+                block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+          }
+        }
         long offset = out.getPos();
-        Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out);
+        Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD);
         column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
       }
     }
@@ -1020,10 +1133,12 @@ public class ParquetFileWriter {
   private static void serializeBloomFilters(
     List<Map<String, BloomFilter>> bloomFilters,
     List<BlockMetaData> blocks,
-    PositionOutputStream out) throws IOException {
+    PositionOutputStream out,
+    InternalFileEncryptor fileEncryptor) throws IOException {
     LOG.debug("{}: bloom filters", out.getPos());
     for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
-      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      BlockMetaData block = blocks.get(bIndex);
+      List<ColumnChunkMetaData> columns = block.getColumns();
       Map<String, BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
       if (blockBloomFilters.isEmpty()) continue;
       for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
@@ -1035,20 +1150,90 @@ public class ParquetFileWriter {
 
         long offset = out.getPos();
         column.setBloomFilterOffset(offset);
-        Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out);
-        bloomFilter.writeTo(out);
+        
+        BlockCipher.Encryptor bloomFilterEncryptor = null;
+        byte[] bloomFilterHeaderAAD = null;
+        byte[] bloomFilterBitsetAAD = null;
+        if (null != fileEncryptor) {
+          InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+          if (columnEncryptionSetup.isEncrypted()) {
+            bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+            int columnOrdinal = columnEncryptionSetup.getOrdinal();
+            bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader, 
+                block.getOrdinal(), columnOrdinal, -1);
+            bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset, 
+                block.getOrdinal(), columnOrdinal, -1);
+          }
+        }
+        
+        Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out, 
+            bloomFilterEncryptor, bloomFilterHeaderAAD);
+        
+        ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+        bloomFilter.writeTo(tempOutStream);
+        byte[] serializedBitset = tempOutStream.toByteArray();
+        if (null != bloomFilterEncryptor) {
+          serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
+        }
+        out.write(serializedBitset);
       }
     }
   }
-
-  private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
-    long footerIndex = out.getPos();
+  
+  private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
+      InternalFileEncryptor fileEncryptor) throws IOException {
+    
     ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
-    org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
-    writeFileMetaData(parquetMetadata, out);
-    LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
-    BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
-    out.write(MAGIC);
+    
+    // Unencrypted file
+    if (null == fileEncryptor) {
+      long footerIndex = out.getPos();
+      org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+      writeFileMetaData(parquetMetadata, out);
+      LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
+      BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+      out.write(MAGIC);
+      return;
+    }
+    
+    org.apache.parquet.format.FileMetaData parquetMetadata =
+        metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor);
+    
+    // Encrypted file with plaintext footer 
+    if (!fileEncryptor.isFooterEncrypted()) {
+      long footerIndex = out.getPos();
+      parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm());
+      // create footer signature (nonce + tag of encrypted footer)
+      byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData();
+      if (null != footerSigningKeyMetaData) {
+        parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData);
+      }
+      ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+      writeFileMetaData(parquetMetadata, tempOutStream);
+      byte[] serializedFooter = tempOutStream.toByteArray();
+      byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+      byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD);
+      byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH];
+      System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce
+      System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH, 
+          signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag
+      out.write(serializedFooter);
+      out.write(signature);
+      LOG.debug("{}: footer and signature length = {}" , out.getPos(), (out.getPos() - footerIndex));
+      BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+      out.write(MAGIC);
+      return;
+    }
+
+    // Encrypted file with encrypted footer
+    long cryptoFooterIndex = out.getPos();
+    writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out);
+    byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+    writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD);
+    int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex);
+    LOG.debug("{}: crypto metadata and footer length = {}" , out.getPos(), combinedMetaDataLength);
+    BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength);
+    out.write(EFMAGIC);
   }
 
   public ParquetMetadata getFooter() {
@@ -1157,7 +1342,7 @@ public class ParquetFileWriter {
       throws IOException {
     PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
     metadata.write(MAGIC);
-    serializeFooter(metadataFooter, metadata);
+    serializeFooter(metadataFooter, metadata, null);
     metadata.close();
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index a4c8e45..4eb0408 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
@@ -471,10 +473,13 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
       LOG.info("Parquet properties are:\n{}", props);
     }
 
-    WriteContext init = writeSupport.init(conf);
+    WriteContext fileWriteContext = writeSupport.init(conf);
+    
+    FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);
+    
     ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
-        init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
-        props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled());
+        fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
+        props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties);
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
@@ -494,8 +499,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     return new ParquetRecordWriter<T>(
         w,
         writeSupport,
-        init.getSchema(),
-        init.getExtraMetaData(),
+        fileWriteContext.getSchema(),
+        fileWriteContext.getExtraMetaData(),
         blockSize,
         codec,
         validating,
@@ -539,4 +544,13 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public synchronized static MemoryManager getMemoryManager() {
     return memoryManager;
   }
+  
+  private static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath, 
+      WriteContext fileWriteContext) {
+    EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(fileHadoopConfig);
+    if (null == cryptoFactory) {
+      return null;
+    }
+    return cryptoFactory.getFileEncryptionProperties(fileHadoopConfig, tempFilePath, fileWriteContext);
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index 71fde69..c215f5e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -108,7 +109,7 @@ public class ParquetReader<T> implements Closeable {
                         ReadSupport<T> readSupport,
                         FilterCompat.Filter filter) throws IOException {
     this(Collections.singletonList((InputFile) HadoopInputFile.fromPath(file, conf)),
-        HadoopReadOptions.builder(conf)
+        HadoopReadOptions.builder(conf, file)
             .withRecordFilter(Objects.requireNonNull(filter, "filter cannot be null"))
             .build(),
         readSupport);
@@ -185,7 +186,7 @@ public class ParquetReader<T> implements Closeable {
       this.file = null;
       this.path = Objects.requireNonNull(path, "path cannot be null");
       this.conf = new Configuration();
-      this.optionsBuilder = HadoopReadOptions.builder(conf);
+      this.optionsBuilder = HadoopReadOptions.builder(conf, path);
     }
 
     @Deprecated
@@ -194,7 +195,7 @@ public class ParquetReader<T> implements Closeable {
       this.file = null;
       this.path = Objects.requireNonNull(path, "path cannot be null");
       this.conf = new Configuration();
-      this.optionsBuilder = HadoopReadOptions.builder(conf);
+      this.optionsBuilder = HadoopReadOptions.builder(conf, path);
     }
 
     protected Builder(InputFile file) {
@@ -202,11 +203,13 @@ public class ParquetReader<T> implements Closeable {
       this.file = Objects.requireNonNull(file, "file cannot be null");
       this.path = null;
       if (file instanceof HadoopInputFile) {
-        this.conf = ((HadoopInputFile) file).getConfiguration();
+        HadoopInputFile hadoopFile = (HadoopInputFile) file;
+        this.conf = hadoopFile.getConfiguration();
+        optionsBuilder = HadoopReadOptions.builder(conf, hadoopFile.getPath());
       } else {
         this.conf = new Configuration();
+        optionsBuilder = HadoopReadOptions.builder(conf);
       }
-      optionsBuilder = HadoopReadOptions.builder(conf);
     }
 
     // when called, resets options to the defaults from conf
@@ -308,6 +311,11 @@ public class ParquetReader<T> implements Closeable {
       optionsBuilder.withCodecFactory(codecFactory);
       return this;
     }
+    
+    public Builder<T> withDecryption(FileDecryptionProperties fileDecryptionProperties) {
+      optionsBuilder.withDecryption(fileDecryptionProperties);
+      return this;
+    }
 
     public Builder<T> set(String key, String value) {
       optionsBuilder.set(key, value);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 492d917..4653410 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -151,7 +151,7 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
     long[] rowGroupOffsets = split.getRowGroupOffsets();
 
     // if task.side.metadata is set, rowGroupOffsets is null
-    ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration);
+    ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration, path);
     if (rowGroupOffsets != null) {
       optionsBuilder.withOffsets(rowGroupOffsets);
     } else {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index c6b2828..ecc12de 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -193,7 +194,7 @@ public class ParquetWriter<T> implements Closeable {
         compressionCodecName, blockSize, pageSize, dictionaryPageSize,
         enableDictionary, validating, writerVersion, conf);
   }
-
+  
   /**
    * Create a new ParquetWriter.
    *
@@ -232,7 +233,7 @@ public class ParquetWriter<T> implements Closeable {
             .withDictionaryPageSize(dictionaryPageSize)
             .withDictionaryEncoding(enableDictionary)
             .withWriterVersion(writerVersion)
-            .build());
+            .build(), null);
   }
 
   /**
@@ -272,7 +273,8 @@ public class ParquetWriter<T> implements Closeable {
       boolean validating,
       Configuration conf,
       int maxPaddingSize,
-      ParquetProperties encodingProps) throws IOException {
+      ParquetProperties encodingProps,
+      FileEncryptionProperties encryptionProperties) throws IOException {
 
     WriteSupport.WriteContext writeContext = writeSupport.init(conf);
     MessageType schema = writeContext.getSchema();
@@ -280,7 +282,7 @@ public class ParquetWriter<T> implements Closeable {
     ParquetFileWriter fileWriter = new ParquetFileWriter(
       file, schema, mode, rowGroupSize, maxPaddingSize,
       encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
-      encodingProps.getPageWriteChecksumEnabled());
+      encodingProps.getPageWriteChecksumEnabled(), encryptionProperties);
     fileWriter.start();
 
     this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
@@ -342,6 +344,7 @@ public class ParquetWriter<T> implements Closeable {
   public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
     private OutputFile file = null;
     private Path path = null;
+    private FileEncryptionProperties encryptionProperties = null;
     private Configuration conf = new Configuration();
     private ParquetFileWriter.Mode mode;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
@@ -406,6 +409,18 @@ public class ParquetWriter<T> implements Closeable {
     }
 
     /**
+     * Set the {@link FileEncryptionProperties file encryption properties} used by the
+     * constructed writer.
+     *
+     * @param encryptionProperties a {@code FileEncryptionProperties}
+     * @return this builder for method chaining.
+     */
+    public SELF withEncryption (FileEncryptionProperties encryptionProperties) {
+      this.encryptionProperties = encryptionProperties;
+      return self();
+    }
+
+    /**
      * Set the Parquet format row group size used by the constructed writer.
      *
      * @param rowGroupSize an integer size in bytes
@@ -565,6 +580,7 @@ public class ParquetWriter<T> implements Closeable {
      */
     public SELF withBloomFilterNDV(String columnPath, long ndv) {
       encodingPropsBuilder.withBloomFilterNDV(columnPath, ndv);
+
       return self();
     }
 
@@ -615,13 +631,13 @@ public class ParquetWriter<T> implements Closeable {
       if (file != null) {
         return new ParquetWriter<>(file,
             mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf,
-            maxPaddingSize, encodingPropsBuilder.build());
+            maxPaddingSize, encodingPropsBuilder.build(), encryptionProperties);
       } else {
         return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
             mode, getWriteSupport(conf), codecName,
             rowGroupSize, enableValidation, conf, maxPaddingSize,
-            encodingPropsBuilder.build());
+            encodingPropsBuilder.build(), encryptionProperties);
       }
     }
   }
-}
+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
index 0cad0a5..ce204dc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -32,11 +32,11 @@ public class BlockMetaData {
   private long rowCount;
   private long totalByteSize;
   private String path;
+  private int ordinal;
 
   public BlockMetaData() {
   }
-
-
+  
   /**
    * @param path the path to the file containing the data. Or null if same file the metadata was found
    */
@@ -102,6 +102,7 @@ public class BlockMetaData {
   public long getStartingPos() {
     return getColumns().get(0).getStartingPos();
   }
+  
   @Override
   public String toString() {
     return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
@@ -117,4 +118,19 @@ public class BlockMetaData {
     }
     return totalSize;
   }
+  
+  /**
+   * @return row group ordinal
+   */
+  public int getOrdinal() {
+    return ordinal;
+  }
+
+  /**
+  *
+  * @param ordinal - row group ordinal
+  */
+  public void setOrdinal(int ordinal) {
+    this.ordinal = ordinal;
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 2c24356..e816b27 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -18,12 +18,25 @@
  */
 package org.apache.parquet.hadoop.metadata;
 
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import static org.apache.parquet.format.Util.readColumnMetaData;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.Set;
 
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.format.ColumnMetaData;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -34,6 +47,7 @@ import org.apache.yetus.audience.InterfaceAudience.Private;
  * Column meta data for a block stored in the file footer and passed in the InputSplit
  */
 abstract public class ColumnChunkMetaData {
+  protected int rowGroupOrdinal = -1;
 
   @Deprecated
   public static ColumnChunkMetaData get(
@@ -98,6 +112,7 @@ abstract public class ColumnChunkMetaData {
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
+    
     return get(path, Types.optional(type).named("fake_type"), codec, encodingStats, encodings, statistics,
         firstDataPage, dictionaryPageOffset, valueCount, totalSize, totalUncompressedSize);
   }
@@ -114,6 +129,7 @@ abstract public class ColumnChunkMetaData {
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
+
     // to save space we store those always positive longs in ints when they fit.
     if (positiveLongFitsInAnInt(firstDataPage)
         && positiveLongFitsInAnInt(dictionaryPageOffset)
@@ -141,11 +157,32 @@ abstract public class ColumnChunkMetaData {
           totalUncompressedSize);
     }
   }
+  
+  // In sensitive columns, the ColumnMetaData structure is encrypted (with column-specific keys), making the fields like Statistics invisible.
+  // Decryption is not performed pro-actively, due to performance and authorization reasons.
+  // This method creates an a shell ColumnChunkMetaData object that keeps the encrypted metadata and the decryption tools.
+  // These tools will activated later - when/if the column is projected.
+  public static ColumnChunkMetaData getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, 
+      PrimitiveType type, byte[] encryptedMetadata, byte[] columnKeyMetadata,
+      InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal, 
+      String createdBy) {
+    return new EncryptedColumnChunkMetaData(parquetMetadataConverter, path, type, encryptedMetadata, columnKeyMetadata,
+        fileDecryptor, rowGroupOrdinal, columnOrdinal, createdBy);
+  }
+
+  public void setRowGroupOrdinal (int rowGroupOrdinal) {
+    this.rowGroupOrdinal = rowGroupOrdinal;
+  }
+
+  public int getRowGroupOrdinal() {
+    return rowGroupOrdinal;
+  }
 
   /**
    * @return the offset of the first byte in the chunk
    */
   public long getStartingPos() {
+    decryptIfNeeded();
     long dictionaryPageOffset = getDictionaryPageOffset();
     long firstDataPageOffset = getFirstDataPageOffset();
     if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) {
@@ -165,10 +202,10 @@ abstract public class ColumnChunkMetaData {
     return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE);
   }
 
-  private final EncodingStats encodingStats;
+  EncodingStats encodingStats;
 
   // we save 3 references by storing together the column properties that have few distinct values
-  private final ColumnChunkProperties properties;
+  ColumnChunkProperties properties;
 
   private IndexReference columnIndexReference;
   private IndexReference offsetIndexReference;
@@ -184,7 +221,12 @@ abstract public class ColumnChunkMetaData {
     this.properties = columnChunkProperties;
   }
 
+  protected void decryptIfNeeded() {
+    return;
+  }
+
   public CompressionCodecName getCodec() {
+    decryptIfNeeded();
     return properties.getCodec();
   }
 
@@ -202,6 +244,7 @@ abstract public class ColumnChunkMetaData {
    */
   @Deprecated
   public PrimitiveTypeName getType() {
+    decryptIfNeeded();
     return properties.getType();
   }
 
@@ -209,6 +252,7 @@ abstract public class ColumnChunkMetaData {
    * @return the primitive type object of the column
    */
   public PrimitiveType getPrimitiveType() {
+    decryptIfNeeded();
     return properties.getPrimitiveType();
   }
 
@@ -247,6 +291,7 @@ abstract public class ColumnChunkMetaData {
    */
   @Private
   public IndexReference getColumnIndexReference() {
+    decryptIfNeeded();
     return columnIndexReference;
   }
 
@@ -264,6 +309,7 @@ abstract public class ColumnChunkMetaData {
    */
   @Private
   public IndexReference getOffsetIndexReference() {
+    decryptIfNeeded();
     return offsetIndexReference;
   }
 
@@ -290,6 +336,7 @@ abstract public class ColumnChunkMetaData {
    */
   @Private
   public long getBloomFilterOffset() {
+    decryptIfNeeded();
     return bloomFilterOffset;
   }
 
@@ -297,17 +344,31 @@ abstract public class ColumnChunkMetaData {
    * @return all the encodings used in this column
    */
   public Set<Encoding> getEncodings() {
+    decryptIfNeeded();
     return properties.getEncodings();
   }
 
   public EncodingStats getEncodingStats() {
+    decryptIfNeeded();
     return encodingStats;
   }
 
   @Override
   public String toString() {
+    decryptIfNeeded();
     return "ColumnMetaData{" + properties.toString() + ", " + getFirstDataPageOffset() + "}";
   }
+  
+  public boolean hasDictionaryPage() { 
+    EncodingStats stats = getEncodingStats();
+    if (stats != null) {
+      // ensure there is a dictionary page and that it is used to encode data pages
+      return stats.hasDictionaryPages() && stats.hasDictionaryEncodedPages();
+    }
+
+    Set<Encoding> encodings = getEncodings();
+    return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
+  }
 }
 
 class IntColumnChunkMetaData extends ColumnChunkMetaData {
@@ -415,6 +476,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
    return statistics;
   }
 }
+
 class LongColumnChunkMetaData extends ColumnChunkMetaData {
 
   private final long firstDataPageOffset;
@@ -500,3 +562,103 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
   }
 }
 
+class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
+  private final ParquetMetadataConverter parquetMetadataConverter;
+  private final byte[] encryptedMetadata;
+  private final byte[] columnKeyMetadata;
+  private final InternalFileDecryptor fileDecryptor; 
+
+  private final int columnOrdinal;
+  private final PrimitiveType primitiveType;
+  private final String createdBy;
+  private ColumnPath path;
+
+  private boolean decrypted;
+  private ColumnChunkMetaData shadowColumnChunkMetaData;
+
+  EncryptedColumnChunkMetaData(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, PrimitiveType type, 
+      byte[] encryptedMetadata, byte[] columnKeyMetadata,
+      InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal, String createdBy) {
+    super((EncodingStats) null, (ColumnChunkProperties) null);
+    this.parquetMetadataConverter = parquetMetadataConverter;
+    this.path = path;
+    this.encryptedMetadata = encryptedMetadata;
+    this.columnKeyMetadata = columnKeyMetadata;
+    this.fileDecryptor = fileDecryptor;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.primitiveType = type;
+    this.createdBy = createdBy;
+
+    this.decrypted = false;
+  }
+
+  @Override
+  protected void decryptIfNeeded() {
+    if (decrypted) return;
+
+    if (null == fileDecryptor) {
+      throw new ParquetCryptoRuntimeException(path + ". Null File Decryptor");
+    }
+
+    // Decrypt the ColumnMetaData
+    InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(path, true, false, 
+        columnKeyMetadata, columnOrdinal);
+    
+    ColumnMetaData metaData;
+    ByteArrayInputStream tempInputStream = new ByteArrayInputStream(encryptedMetadata);
+    byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData, 
+        rowGroupOrdinal, columnOrdinal, -1);
+    try {
+      metaData = readColumnMetaData(tempInputStream, columnDecryptionSetup.getMetaDataDecryptor(), columnMetaDataAAD);
+    } catch (IOException e) {
+      throw new ParquetCryptoRuntimeException(path + ". Failed to decrypt column metadata", e);
+    }
+    decrypted = true;
+    shadowColumnChunkMetaData = parquetMetadataConverter.buildColumnChunkMetaData(metaData, path, primitiveType, createdBy);
+    this.encodingStats = shadowColumnChunkMetaData.encodingStats;
+    this.properties = shadowColumnChunkMetaData.properties;
+    setBloomFilterOffset(metaData.bloom_filter_offset);
+  }
+
+  @Override
+  public ColumnPath getPath() {
+    return path;
+  }
+
+  @Override
+  public long getFirstDataPageOffset() {
+    decryptIfNeeded();
+    return shadowColumnChunkMetaData.getFirstDataPageOffset();
+  }
+
+  @Override
+  public long getDictionaryPageOffset() {
+    decryptIfNeeded();
+    return shadowColumnChunkMetaData.getDictionaryPageOffset();
+  }
+
+  @Override
+  public long getValueCount() {
+    decryptIfNeeded();
+    return shadowColumnChunkMetaData.getValueCount();
+  }
+
+  @Override
+  public long getTotalUncompressedSize() {
+    decryptIfNeeded();
+    return shadowColumnChunkMetaData.getTotalUncompressedSize();
+  }
+
+  @Override
+  public long getTotalSize() {
+    decryptIfNeeded();
+    return shadowColumnChunkMetaData.getTotalSize();
+  }
+
+  @Override
+  public Statistics getStatistics() {
+    decryptIfNeeded();
+    return shadowColumnChunkMetaData.getStatistics();
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
index ca9488f..1668a7f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
 
+import org.apache.parquet.crypto.InternalFileDecryptor;
 import org.apache.parquet.schema.MessageType;
 
 
@@ -38,6 +39,8 @@ public final class FileMetaData implements Serializable {
   private final Map<String, String> keyValueMetaData;
 
   private final String createdBy;
+  
+  private final InternalFileDecryptor fileDecryptor;
 
   /**
    * @param schema the schema for the file
@@ -47,11 +50,16 @@ public final class FileMetaData implements Serializable {
    * @throws NullPointerException if schema or keyValueMetaData is {@code null}
    */
   public FileMetaData(MessageType schema, Map<String, String> keyValueMetaData, String createdBy) {
+    this(schema, keyValueMetaData, createdBy, null);
+  }
+  
+  public FileMetaData(MessageType schema, Map<String, String> keyValueMetaData, String createdBy, InternalFileDecryptor fileDecryptor) {
     super();
     this.schema = Objects.requireNonNull(schema, "schema cannot be null");
     this.keyValueMetaData = unmodifiableMap(Objects
         .requireNonNull(keyValueMetaData, "keyValueMetaData cannot be null"));
     this.createdBy = createdBy;
+    this.fileDecryptor = fileDecryptor;
   }
 
   /**
@@ -80,4 +88,7 @@ public final class FileMetaData implements Serializable {
     return createdBy;
   }
 
+  public InternalFileDecryptor getFileDecryptor() {
+    return fileDecryptor;
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
index fb876a8..ad26430 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -54,6 +54,10 @@ public class HadoopInputFile implements InputFile {
   public Configuration getConfiguration() {
     return conf;
   }
+  
+  public Path getPath() {
+    return stat.getPath();
+  }
 
   @Override
   public long getLength() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
new file mode 100644
index 0000000..e4cc550
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.TestColumnIndexEncryption.StringKeyIdRetriever;
+import org.apache.parquet.io.api.Binary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.*;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestBloomEncryption {
+  private static final Path FILE_V1 = createTempFile();
+  private static final Path FILE_V2 = createTempFile();
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomEncryption.class);
+  private static final Random RANDOM = new Random(42);
+  private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
+  private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
+  
+  private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
+
+  private final Path file;
+  public TestBloomEncryption(Path file) {
+    this.file = file;
+  }
+
+  private static Path createTempFile() {
+    try {
+      return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+  }
+
+  private static List<PhoneBookWriter.User> generateData(int rowCount) {
+    List<PhoneBookWriter.User> users = new ArrayList<>();
+    List<String> names = generateNames(rowCount);
+    for (int i = 0; i < rowCount; ++i) {
+      users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
+    }
+    return users;
+  }
+
+  private static List<String> generateNames(int rowCount) {
+    List<String> list = new ArrayList<>();
+
+    // Adding fix values for filtering
+    list.add("anderson");
+    list.add("anderson");
+    list.add("miller");
+    list.add("miller");
+    list.add("miller");
+    list.add("thomas");
+    list.add("thomas");
+    list.add("williams");
+
+    int nullCount = rowCount / 100;
+
+    String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
+    int maxLength = 8;
+    for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
+      int l = RANDOM.nextInt(maxLength);
+      StringBuilder builder = new StringBuilder(l);
+      for (int j = 0; j < l; ++j) {
+        builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
+      }
+      list.add(builder.toString());
+    }
+    list.sort((str1, str2) -> -str1.compareTo(str2));
+
+    // Adding nulls to random places
+    for (int i = 0; i < nullCount; ++i) {
+      list.add(RANDOM.nextInt(list.size()), null);
+    }
+
+    return list;
+  }
+
+  private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers() {
+    int length = RANDOM.nextInt(5) - 1;
+    if (length < 0) {
+      return null;
+    }
+    List<PhoneBookWriter.PhoneNumber> phoneNumbers = new ArrayList<>(length);
+    for (int i = 0; i < length; ++i) {
+      // 6 digits numbers
+      long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
+      phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+    }
+    return phoneNumbers;
+  }
+
+  private static PhoneBookWriter.Location generateLocation(int id, int rowCount) {
+    if (RANDOM.nextDouble() < 0.01) {
+      return null;
+    }
+
+    if (RANDOM.nextDouble() < 0.001) {
+      return new PhoneBookWriter.Location(99.9, 99.9);
+    }
+
+    double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
+    double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
+
+    return new PhoneBookWriter.Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
+  }
+
+  private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
+                                               boolean useBloomFilter) throws IOException {
+    /*
+    byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
+    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+        .withFooterKey(keyBytes)
+        .build();
+        */
+    
+    StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
+    kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
+    kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
+    kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
+
+    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(kr1)
+        .build();
+    
+    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+      .withFilter(FilterCompat.get(filter))
+      .withDecryption(fileDecryptionProperties)
+      .useDictionaryFilter(useOtherFiltering)
+      .useStatsFilter(useOtherFiltering)
+      .useRecordFilter(useOtherFiltering)
+      .useBloomFilter(useBloomFilter)
+      .useColumnIndexFilter(useOtherFiltering));
+  }
+
+  // Assumes that both lists are in the same order
+  private static void assertContains(Stream<PhoneBookWriter.User> expected, List<PhoneBookWriter.User> actual) {
+    Iterator<PhoneBookWriter.User> expIt = expected.iterator();
+    if (!expIt.hasNext()) {
+      return;
+    }
+    PhoneBookWriter.User exp = expIt.next();
+    for (PhoneBookWriter.User act : actual) {
+      if (act.equals(exp)) {
+        if (!expIt.hasNext()) {
+          break;
+        }
+        exp = expIt.next();
+      }
+    }
+    assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
+  }
+
+  private void assertCorrectFiltering(Predicate<PhoneBookWriter.User> expectedFilter, FilterPredicate actualFilter)
+    throws IOException {
+    // Check with only bloom filter based filtering
+    List<PhoneBookWriter.User> result = readUsers(actualFilter, false, true);
+
+    assertTrue("Bloom filtering should drop some row groups", result.size() < DATA.size());
+    LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
+      100 * result.size() / DATA.size());
+    // Asserts that all the required records are in the result
+    assertContains(DATA.stream().filter(expectedFilter), result);
+    // Asserts that all the retrieved records are in the file (validating non-matching records)
+    assertContains(result.stream(), DATA);
+
+    // Check with all the filtering filtering to ensure the result contains exactly the required values
+    result = readUsers(actualFilter, true, false);
+    assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
+  }
+
+
+  @BeforeClass
+  public static void createFile() throws IOException {
+    int pageSize = DATA.size() / 100;     // Ensure that several pages will be created
+    int rowGroupSize = pageSize * 4;    // Ensure that there are more row-groups created
+/*    
+    byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
+    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(keyBytes)
+        .build();
+    */
+    // Encryption configuration 2: Encrypt two columns and the footer, with different keys.
+    ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+        .builder("id")
+        .withKey(COLUMN_ENCRYPTION_KEY1)
+        .withKeyID("kc1")
+        .build();
+
+    ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+        .builder("name")
+        .withKey(COLUMN_ENCRYPTION_KEY2)
+        .withKeyID("kc2")
+        .build();
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+        .withFooterKeyID("kf")
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    
+    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withBloomFilterNDV("location.lat", 10000L)
+        .withBloomFilterNDV("name", 10000L)
+        .withBloomFilterNDV("id", 10000L)
+        .withEncryption(encryptionProperties)
+        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
+      DATA);
+    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withBloomFilterNDV("location.lat", 10000L)
+        .withBloomFilterNDV("name", 10000L)
+        .withBloomFilterNDV("id", 10000L)
+        .withEncryption(encryptionProperties)
+        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
+      DATA);
+  }
+
+  @AfterClass
+  public static void deleteFile() throws IOException {
+    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
+    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+  }
+
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    assertCorrectFiltering(
+      record -> record.getId() == 1234L,
+      eq(longColumn("id"), 1234L));
+
+    assertCorrectFiltering(
+      record -> "miller".equals(record.getName()),
+      eq(binaryColumn("name"), Binary.fromString("miller")));
+  }
+
+  @Test
+  public void testNestedFiltering() throws IOException {
+    assertCorrectFiltering(
+      record -> {
+        PhoneBookWriter.Location location = record.getLocation();
+        return location != null && location.getLat() != null && location.getLat() == 99.9;
+      },
+      eq(doubleColumn("location.lat"), 99.9));
+  }
+}
+
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
new file mode 100644
index 0000000..2fd2207
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.optional;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for high level column index based filtering.
+ */
+@RunWith(Parameterized.class)
+public class TestColumnIndexEncryption {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexEncryption.class);
+  private static final Random RANDOM = new Random(42);
+  private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
+  private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
+  private static final Path FILE_V1 = createTempFile();
+  private static final Path FILE_V2 = createTempFile();
+  private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
+      .required(INT64).named("id")
+      .optionalGroup()
+        .addField(optional(DOUBLE).named("lon"))
+        .addField(optional(DOUBLE).named("lat"))
+        .named("location")
+      .optionalGroup()
+        .repeatedGroup()
+          .addField(required(INT64).named("number"))
+          .addField(optional(BINARY).as(stringType()).named("kind"))
+          .named("phone")
+        .named("phoneNumbers")
+      .named("user_without_name");
+  private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
+  
+//Simple key retriever, based on UTF8 strings as key identifiers
+ static class StringKeyIdRetriever implements DecryptionKeyRetriever{
+
+   private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
+
+   public void putKey(String keyId, byte[] keyBytes) {
+     keyMap.put(keyId, keyBytes);
+   }
+
+   @Override
+   public byte[] getKey(byte[] keyMetaData) {
+     String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+     return keyMap.get(keyId);
+   }
+ }
+
+
+  @Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+  }
+
+  private final Path file;
+
+  public TestColumnIndexEncryption(Path file) {
+    this.file = file;
+  }
+
+  private static List<User> generateData(int rowCount) {
+    List<User> users = new ArrayList<>();
+    List<String> names = generateNames(rowCount);
+    for (int i = 0; i < rowCount; ++i) {
+      users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
+    }
+    return users;
+  }
+
+  private static List<String> generateNames(int rowCount) {
+    List<String> list = new ArrayList<>();
+
+    // Adding fix values for filtering
+    list.add("anderson");
+    list.add("anderson");
+    list.add("miller");
+    list.add("miller");
+    list.add("miller");
+    list.add("thomas");
+    list.add("thomas");
+    list.add("williams");
+
+    int nullCount = rowCount / 100;
+
+    String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
+    int maxLength = 8;
+    for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
+      int l = RANDOM.nextInt(maxLength);
+      StringBuilder builder = new StringBuilder(l);
+      for (int j = 0; j < l; ++j) {
+        builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
+      }
+      list.add(builder.toString());
+    }
+    Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
+
+    // Adding nulls to random places
+    for (int i = 0; i < nullCount; ++i) {
+      list.add(RANDOM.nextInt(list.size()), null);
+    }
+
+    return list;
+  }
+
+  private static List<PhoneNumber> generatePhoneNumbers() {
+    int length = RANDOM.nextInt(5) - 1;
+    if (length < 0) {
+      return null;
+    }
+    List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
+    for (int i = 0; i < length; ++i) {
+      // 6 digits numbers
+      long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
+      phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+    }
+    return phoneNumbers;
+  }
+
+  private static Location generateLocation(int id, int rowCount) {
+    if (RANDOM.nextDouble() < 0.01) {
+      return null;
+    }
+
+    double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
+    double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
+
+    return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
+  }
+
+  private static Path createTempFile() {
+    try {
+      return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
+    return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
+  }
+
+  private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+      throws IOException {
+    return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
+  }
+
+  private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
+    return readUsers(filter, useOtherFiltering, true);
+  }
+
+  private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+      throws IOException {
+    
+    StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
+    kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
+    kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
+    kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
+
+    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(kr1)
+        .build();
+    
+    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(filter)
+        .withDecryption(fileDecryptionProperties)
+        .useDictionaryFilter(useOtherFiltering)
+        .useStatsFilter(useOtherFiltering)
+        .useRecordFilter(useOtherFiltering)
+        .useColumnIndexFilter(useColumnIndexFilter));
+  }
+
+  private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {    
+    StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
+    kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
+    kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
+    kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
+
+    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(kr1)
+        .build();
+    
+    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(filter)
+        .withDecryption(fileDecryptionProperties)
+        .useDictionaryFilter(useOtherFiltering)
+        .useStatsFilter(useOtherFiltering)
+        .useRecordFilter(useOtherFiltering)
+        .useColumnIndexFilter(useColumnIndexFilter)
+        .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
+  }
+
+  // Assumes that both lists are in the same order
+  private static void assertContains(Stream<User> expected, List<User> actual) {
+    Iterator<User> expIt = expected.iterator();
+    if (!expIt.hasNext()) {
+      return;
+    }
+    User exp = expIt.next();
+    for (User act : actual) {
+      if (act.equals(exp)) {
+        if (!expIt.hasNext()) {
+          break;
+        }
+        exp = expIt.next();
+      }
+    }
+    assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
+  }
+
+  private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
+      throws IOException {
+    // Check with only column index based filtering
+    List<User> result = readUsers(actualFilter, false);
+
+    assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());
+    LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
+        100 * result.size() / DATA.size());
+    // Asserts that all the required records are in the result
+    assertContains(DATA.stream().filter(expectedFilter), result);
+    // Asserts that all the retrieved records are in the file (validating non-matching records)
+    assertContains(result.stream(), DATA);
+
+    // Check with all the filtering filtering to ensure the result contains exactly the required values
+    result = readUsers(actualFilter, true);
+    assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
+  }
+
+  @BeforeClass
+  public static void createFile() throws IOException {
+    int pageSize = DATA.size() / 10;     // Ensure that several pages will be created
+    int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+    
+    // Encryption configuration: Encrypt two columns and the footer, with different keys.
+    ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+        .builder("id")
+        .withKey(COLUMN_ENCRYPTION_KEY1)
+        .withKeyID("kc1")
+        .build();
+
+    ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+        .builder("name")
+        .withKey(COLUMN_ENCRYPTION_KEY2)
+        .withKeyID("kc2")
+        .build();
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+        .withFooterKeyID("kf")
+        .withEncryptedColumns(columnPropertiesMap)
+        .build();
+    
+    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withEncryption(encryptionProperties)
+        .withWriterVersion(WriterVersion.PARQUET_1_0),
+        DATA);
+    
+    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withEncryption(encryptionProperties)
+        .withWriterVersion(WriterVersion.PARQUET_2_0),
+        DATA);
+  }
+
+  @AfterClass
+  public static void deleteFile() throws IOException {
+    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
+    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    assertCorrectFiltering(
+        record -> record.getId() == 1234,
+        eq(longColumn("id"), 1234l));
+    assertCorrectFiltering(
+        record -> "miller".equals(record.getName()),
+        eq(binaryColumn("name"), Binary.fromString("miller")));
+    assertCorrectFiltering(
+        record -> record.getName() == null,
+        eq(binaryColumn("name"), null));
+  }
+
+  @Test
+  public void testNoFiltering() throws IOException {
+    // Column index filtering with no-op filter
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
+    assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
+
+    // Column index filtering turned off
+    assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
+        readUsers(eq(longColumn("id"), 1234l), true, false));
+    assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()),
+        readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
+    assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()),
+        readUsers(eq(binaryColumn("name"), null), true, false));
+
+    // Every filtering mechanism turned off
+    assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
+    assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
+    assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
+  }
+
+  @Test
+  public void testComplexFiltering() throws IOException {
+    assertCorrectFiltering(
+        record -> {
+          Location loc = record.getLocation();
+          Double lat = loc == null ? null : loc.getLat();
+          Double lon = loc == null ? null : loc.getLon();
+          return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
+        },
+        and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)),
+            and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
+    assertCorrectFiltering(
+        record -> {
+          Location loc = record.getLocation();
+          return loc == null || (loc.getLat() == null && loc.getLon() == null);
+        },
+        and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null)));
+    assertCorrectFiltering(
+        record -> {
+          String name = record.getName();
+          return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
+        },
+        and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
+  }
+
+  public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
+    private static final Binary A = Binary.fromString("a");
+    private static final Binary B = Binary.fromString("b");
+    private static final Binary E = Binary.fromString("e");
+    private static final Binary F = Binary.fromString("f");
+    private static final Binary I = Binary.fromString("i");
+    private static final Binary J = Binary.fromString("j");
+    private static final Binary O = Binary.fromString("o");
+    private static final Binary P = Binary.fromString("p");
+    private static final Binary U = Binary.fromString("u");
+    private static final Binary V = Binary.fromString("v");
+
+    private static boolean isStartingWithVowel(String str) {
+      if (str == null || str.isEmpty()) {
+        return false;
+      }
+      switch (str.charAt(0)) {
+        case 'a':
+        case 'e':
+        case 'i':
+        case 'o':
+        case 'u':
+          return true;
+        default:
+          return false;
+      }
+    }
+
+    @Override
+    public boolean keep(Binary value) {
+      return value != null && isStartingWithVowel(value.toStringUsingUTF8());
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      Binary min = statistics.getMin();
+      Binary max = statistics.getMax();
+      return cmp.compare(max, A) < 0
+          || (cmp.compare(min, B) >= 0 && cmp.compare(max, E) < 0)
+          || (cmp.compare(min, F) >= 0 && cmp.compare(max, I) < 0)
+          || (cmp.compare(min, J) >= 0 && cmp.compare(max, O) < 0)
+          || (cmp.compare(min, P) >= 0 && cmp.compare(max, U) < 0)
+          || cmp.compare(min, V) >= 0;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Binary> statistics) {
+      Comparator<Binary> cmp = statistics.getComparator();
+      Binary min = statistics.getMin();
+      Binary max = statistics.getMax();
+      return (cmp.compare(min, A) >= 0 && cmp.compare(max, B) < 0)
+          || (cmp.compare(min, E) >= 0 && cmp.compare(max, F) < 0)
+          || (cmp.compare(min, I) >= 0 && cmp.compare(max, J) < 0)
+          || (cmp.compare(min, O) >= 0 && cmp.compare(max, P) < 0)
+          || (cmp.compare(min, U) >= 0 && cmp.compare(max, V) < 0);
+    }
+  }
+
+  public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
+    private long divisor;
+
+    IsDivisibleBy(long divisor) {
+      this.divisor = divisor;
+    }
+
+    @Override
+    public boolean keep(Long value) {
+      // Deliberately not checking for null to verify the handling of NPE
+      // Implementors shall always checks the value for null and return accordingly
+      return value % divisor == 0;
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Long> statistics) {
+      long min = statistics.getMin();
+      long max = statistics.getMax();
+      return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Long> statistics) {
+      long min = statistics.getMin();
+      long max = statistics.getMax();
+      return min == max && min % divisor == 0;
+    }
+  }
+
+  @Test
+  public void testUDF() throws IOException {
+    assertCorrectFiltering(
+        record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
+        or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+            userDefined(longColumn("id"), new IsDivisibleBy(234))));
+    assertCorrectFiltering(
+        record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
+            not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+                userDefined(longColumn("id"), new IsDivisibleBy(234)))));
+  }
+
+  @Test
+  public void testFilteringWithMissingColumns() throws IOException {
+    // Missing column filter is always true
+    assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
+    assertCorrectFiltering(
+        record -> record.getId() == 1234,
+        and(eq(longColumn("id"), 1234l),
+            eq(longColumn("not-existing-long"), null)));
+    assertCorrectFiltering(
+        record -> "miller".equals(record.getName()),
+        and(eq(binaryColumn("name"), Binary.fromString("miller")),
+            invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
+
+    // Missing column filter is always false
+    assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
+    assertCorrectFiltering(
+        record -> "miller".equals(record.getName()),
+        or(eq(binaryColumn("name"), Binary.fromString("miller")),
+            gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
+    assertCorrectFiltering(
+        record -> record.getId() == 1234,
+        or(eq(longColumn("id"), 1234l),
+            userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
+  }
+
+  @Test
+  public void testFilteringWithProjection() throws IOException {
+    // All rows shall be retrieved because all values in column 'name' shall be handled as null values
+    assertEquals(
+        DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()),
+        readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
+
+    // Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
+    assertEquals(
+        emptyList(),
+        readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
+    assertEquals(
+        emptyList(),
+        readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
+            SCHEMA_WITHOUT_NAME, false, true));
+  }
+}
+