You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/02/27 11:43:58 UTC

[parquet-mr] 02/02: PARQUET-1286: Crypto package (#614)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git

commit 8b306bf5716aaa271e7046bc354f49fc80205eb1
Author: ggershinsky <gg...@users.noreply.github.com>
AuthorDate: Wed Feb 12 13:51:56 2020 +0200

    PARQUET-1286: Crypto package (#614)
---
 .../apache/parquet/crypto/AADPrefixVerifier.java   |  34 +++
 .../java/org/apache/parquet/crypto/AesCipher.java  | 120 +++++++++
 .../org/apache/parquet/crypto/AesCtrDecryptor.java | 130 +++++++++
 .../org/apache/parquet/crypto/AesCtrEncryptor.java | 100 +++++++
 .../org/apache/parquet/crypto/AesGcmDecryptor.java | 118 ++++++++
 .../org/apache/parquet/crypto/AesGcmEncryptor.java |  86 ++++++
 .../java/org/apache/parquet/crypto/AesMode.java    |  35 +++
 .../parquet/crypto/ColumnDecryptionProperties.java | 104 +++++++
 .../parquet/crypto/ColumnEncryptionProperties.java | 186 +++++++++++++
 .../parquet/crypto/DecryptionKeyRetriever.java     |  41 +++
 .../parquet/crypto/FileDecryptionProperties.java   | 254 +++++++++++++++++
 .../parquet/crypto/FileEncryptionProperties.java   | 278 +++++++++++++++++++
 .../parquet/crypto/HiddenColumnException.java      |  38 +++
 .../crypto/InternalColumnDecryptionSetup.java      |  74 +++++
 .../crypto/InternalColumnEncryptionSetup.java      |  82 ++++++
 .../parquet/crypto/InternalFileDecryptor.java      | 299 +++++++++++++++++++++
 .../parquet/crypto/InternalFileEncryptor.java      | 175 ++++++++++++
 .../parquet/crypto/KeyAccessDeniedException.java   |  30 +++
 .../apache/parquet/crypto/ModuleCipherFactory.java |  75 ++++++
 .../org/apache/parquet/crypto/ParquetCipher.java   |  42 +++
 .../parquet/crypto/StringKeyIdRetriever.java       |  39 +++
 21 files changed, 2340 insertions(+)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
new file mode 100755
index 0000000..a0a8029
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.io.IOException;
+
+public interface AADPrefixVerifier {
+
+  /**
+   * Verifies identity (AAD Prefix) of individual file, or of file collection in a data set.
+   * Must be thread-safe.
+   * 
+   * @param aadPrefix AAD Prefix
+   * @throws IOException Throw exception if AAD prefix is wrong.
+   */
+  public void verify(byte[] aadPrefix) throws IOException;
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
new file mode 100755
index 0000000..fb9588d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+
+import java.io.IOException;
+import java.security.SecureRandom;
+
+public class AesCipher {
+
+  public static final int NONCE_LENGTH = 12;
+  public static final int GCM_TAG_LENGTH = 16;
+
+  static final int AAD_FILE_UNIQUE_LENGTH = 8;
+
+  protected static final int CTR_IV_LENGTH = 16;
+  protected static final int GCM_TAG_LENGTH_BITS = 8 * GCM_TAG_LENGTH;
+  protected static final int CHUNK_LENGTH = 4 * 1024;
+  protected static final int SIZE_LENGTH = ModuleCipherFactory.SIZE_LENGTH;
+
+  protected SecretKeySpec aesKey;
+  protected final SecureRandom randomGenerator;
+  protected Cipher cipher;
+  protected final byte[] localNonce;
+
+  AesCipher(AesMode mode, byte[] keyBytes) throws IllegalArgumentException, IOException {
+    if (null == keyBytes) {
+      throw new IllegalArgumentException("Null key bytes");
+    }
+
+    boolean allZeroKey = true;
+    for (byte kb : keyBytes) {
+      if (kb != 0) {
+        allZeroKey = false;
+        break;
+      }
+    }
+
+    if (allZeroKey) {
+      throw new IllegalArgumentException("All key bytes are zero");
+    }
+
+    aesKey = new SecretKeySpec(keyBytes, "AES");
+    randomGenerator = new SecureRandom();
+    localNonce = new byte[NONCE_LENGTH];
+  }
+
+  public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, 
+      short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) {
+    byte[] typeOrdinalBytes = new byte[1];
+    typeOrdinalBytes[0] = moduleType.getValue();
+    if (ModuleType.Footer == moduleType) {
+      return concatByteArrays(fileAAD, typeOrdinalBytes);      
+    }
+
+    byte[] rowGroupOrdinalBytes = shortToBytesLE(rowGroupOrdinal);
+    byte[] columnOrdinalBytes = shortToBytesLE(columnOrdinal);
+    if (ModuleType.DataPage != moduleType && ModuleType.DataPageHeader != moduleType) {
+      return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes); 
+    }
+
+    byte[] pageOrdinalBytes = shortToBytesLE(pageOrdinal);
+    return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes, pageOrdinalBytes);
+  }
+
+  public static byte[] createFooterAAD(byte[] aadPrefixBytes) {
+    return createModuleAAD(aadPrefixBytes, ModuleType.Footer, (short) -1, (short) -1, (short) -1);
+  }
+
+  // Update last two bytes with new page ordinal (instead of creating new page AAD from scratch)
+  public static void quickUpdatePageAAD(byte[] pageAAD, short newPageOrdinal) {
+    byte[] pageOrdinalBytes = shortToBytesLE(newPageOrdinal);
+    System.arraycopy(pageOrdinalBytes, 0, pageAAD, pageAAD.length - 2, 2);
+  }
+
+  static byte[] concatByteArrays(byte[]... arrays) {
+    int totalLength = 0;
+    for (byte[] array : arrays) {
+      totalLength += array.length;
+    }
+
+    byte[] output = new byte[totalLength];
+    int offset = 0;
+    for (byte[] array : arrays) {
+      System.arraycopy(array, 0, output, offset, array.length);
+      offset += array.length;
+    }
+
+    return output;
+  }
+
+  private static byte[] shortToBytesLE(short input) {
+    byte[] output  = new byte[2];
+    output[1] = (byte)(0xff & (input >> 8));
+    output[0] = (byte)(0xff & input);
+
+    return output;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
new file mode 100755
index 0000000..bf17b8c
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+
+import org.apache.parquet.format.BlockCipher;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+
+public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
+
+  private final byte[] ctrIV;
+
+  AesCtrDecryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+    super(AesMode.CTR, keyBytes);
+
+    try {
+      cipher = Cipher.getInstance(AesMode.CTR.getCipherName());
+    } catch (GeneralSecurityException e) {
+      throw new IOException("Failed to create CTR cipher", e);
+    }
+    ctrIV = new byte[CTR_IV_LENGTH];
+    // Setting last bit of initial CTR counter to 1
+    ctrIV[CTR_IV_LENGTH - 1] = (byte) 1;
+  }
+
+  @Override
+  public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD)  throws IOException {
+    int cipherTextOffset = SIZE_LENGTH;
+    int cipherTextLength = lengthAndCiphertext.length - SIZE_LENGTH;
+
+    return decrypt(lengthAndCiphertext, cipherTextOffset, cipherTextLength, AAD);
+  }
+
+  public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, 
+      byte[] AAD)  throws IOException {
+
+    int plainTextLength = cipherTextLength - NONCE_LENGTH;
+    if (plainTextLength < 1) {
+      throw new IOException("Wrong input length " + plainTextLength);
+    }
+
+    // Get the nonce from ciphertext
+    System.arraycopy(ciphertext, cipherTextOffset, ctrIV, 0, NONCE_LENGTH);
+
+    byte[] plainText = new byte[plainTextLength];
+    int inputLength = cipherTextLength - NONCE_LENGTH;
+    int inputOffset = cipherTextOffset + NONCE_LENGTH;
+    int outputOffset = 0;
+    try {
+      IvParameterSpec spec = new IvParameterSpec(ctrIV);
+      cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
+
+      // Breaking decryption into multiple updates, to trigger h/w acceleration in Java 9+
+      while (inputLength > CHUNK_LENGTH) {
+        int written = cipher.update(ciphertext, inputOffset, CHUNK_LENGTH, plainText, outputOffset);
+        inputOffset += CHUNK_LENGTH;
+        outputOffset += written;
+        inputLength -= CHUNK_LENGTH;
+      } 
+
+      cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset);
+    } catch (GeneralSecurityException e) {
+      throw new IOException("Failed to decrypt", e);
+    }
+
+    return plainText;
+  }
+
+  @Override
+  public byte[] decrypt(InputStream from, byte[] AAD) throws IOException {
+    byte[] lengthBuffer = new byte[SIZE_LENGTH];
+    int gotBytes = 0;
+
+    // Read the length of encrypted Thrift structure
+    while (gotBytes < SIZE_LENGTH) {
+      int n = from.read(lengthBuffer, gotBytes, SIZE_LENGTH - gotBytes);
+      if (n <= 0) {
+        throw new IOException("Tried to read int (4 bytes), but only got " + gotBytes + " bytes.");
+      }
+      gotBytes += n;
+    }
+
+    final int ciphertextLength =
+        ((lengthBuffer[3] & 0xff) << 24) |
+        ((lengthBuffer[2] & 0xff) << 16) |
+        ((lengthBuffer[1] & 0xff) << 8)  |
+        ((lengthBuffer[0] & 0xff));
+
+    if (ciphertextLength < 1) {
+      throw new IOException("Wrong length of encrypted metadata: " + ciphertextLength);
+    }
+
+    // Read the encrypted structure contents
+    byte[] ciphertextBuffer = new byte[ciphertextLength];
+    gotBytes = 0;
+    while (gotBytes < ciphertextLength) {
+      int n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes);
+      if (n <= 0) {
+        throw new IOException("Tried to read " + ciphertextLength + 
+            " bytes, but only got " + gotBytes + " bytes.");
+      }
+      gotBytes += n;
+    }
+
+    // Decrypt the structure contents
+    return decrypt(ciphertextBuffer, 0, ciphertextLength, AAD);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
new file mode 100755
index 0000000..ec1f8d4
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.format.BlockCipher;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
+
+  private final byte[] ctrIV;
+
+  AesCtrEncryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+    super(AesMode.CTR, keyBytes);
+
+    try {
+      cipher = Cipher.getInstance(AesMode.CTR.getCipherName());
+    } catch (GeneralSecurityException e) {
+      throw new IOException("Failed to create CTR cipher", e);
+    }
+
+    ctrIV = new byte[CTR_IV_LENGTH];
+    // Setting last bit of initial CTR counter to 1
+    ctrIV[CTR_IV_LENGTH - 1] = (byte) 1;
+  }
+
+  @Override
+  public byte[] encrypt(byte[] plainText, byte[] AAD)  throws IOException {
+    return encrypt(true, plainText, AAD);
+  }
+
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD)  throws IOException {
+    randomGenerator.nextBytes(localNonce);
+    return encrypt(writeLength, plainText, localNonce, AAD);
+  }
+
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD)  
+      throws IOException {
+
+    if (nonce.length != NONCE_LENGTH) {
+      throw new IOException("Wrong nonce length " + nonce.length);
+    }
+    int plainTextLength = plainText.length;
+    int cipherTextLength = NONCE_LENGTH + plainTextLength;
+    int lengthBufferLength = writeLength? SIZE_LENGTH : 0;
+    byte[] cipherText = new byte[lengthBufferLength + cipherTextLength];
+    int inputLength = plainTextLength;
+    int inputOffset = 0;
+    int outputOffset = lengthBufferLength + NONCE_LENGTH;
+    try {
+      System.arraycopy(nonce, 0, ctrIV, 0, NONCE_LENGTH);
+      IvParameterSpec spec = new IvParameterSpec(ctrIV);
+      cipher.init(Cipher.ENCRYPT_MODE, aesKey, spec);
+
+      // Breaking encryption into multiple updates, to trigger h/w acceleration in Java 9+
+      while (inputLength > CHUNK_LENGTH) {
+        int written = cipher.update(plainText, inputOffset, CHUNK_LENGTH, cipherText, outputOffset);
+        inputOffset += CHUNK_LENGTH;
+        outputOffset += written;
+        inputLength -= CHUNK_LENGTH;
+      }
+
+      cipher.doFinal(plainText, inputOffset, inputLength, cipherText, outputOffset);
+    }  catch (GeneralSecurityException e) {
+      throw new IOException("Failed to encrypt", e);
+    }
+
+    // Add ciphertext length
+    if (writeLength) {
+      System.arraycopy(BytesUtils.intToBytes(cipherTextLength), 0, cipherText, 0, lengthBufferLength);
+    }
+    // Add the nonce
+    System.arraycopy(nonce, 0, cipherText, lengthBufferLength, NONCE_LENGTH);
+
+    return cipherText;
+  }
+}
+
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
new file mode 100755
index 0000000..d7f1486
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+
+import org.apache.parquet.format.BlockCipher;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.GeneralSecurityException;
+
+public class AesGcmDecryptor extends AesCipher implements BlockCipher.Decryptor{
+
+
+  AesGcmDecryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+    super(AesMode.GCM, keyBytes);
+
+    try {
+      cipher = Cipher.getInstance(AesMode.GCM.getCipherName());
+    } catch (GeneralSecurityException e) {
+      throw new IOException("Failed to create GCM cipher", e);
+    }
+  }
+
+  @Override
+  public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD)  throws IOException {
+    int cipherTextOffset = SIZE_LENGTH;
+    int cipherTextLength = lengthAndCiphertext.length - SIZE_LENGTH;
+
+    return decrypt(lengthAndCiphertext, cipherTextOffset, cipherTextLength, AAD);
+  }
+
+  public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD)  
+      throws IOException {
+
+    int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
+    if (plainTextLength < 1) {
+      throw new IOException("Wrong input length " + plainTextLength);
+    }
+
+    // Get the nonce from ciphertext
+    System.arraycopy(ciphertext, cipherTextOffset, localNonce, 0, NONCE_LENGTH);
+
+    byte[] plainText = new byte[plainTextLength];
+    int inputLength = cipherTextLength - NONCE_LENGTH;
+    int inputOffset = cipherTextOffset + NONCE_LENGTH;
+    int outputOffset = 0;
+    try {
+      GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, localNonce);
+      cipher.init(Cipher.DECRYPT_MODE, aesKey, spec);
+      if (null != AAD) cipher.updateAAD(AAD);
+
+      cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset);
+    }  catch (GeneralSecurityException e) {
+      throw new IOException("Failed to decrypt", e);
+    }
+
+    return plainText;
+  }
+
+  @Override
+  public byte[] decrypt(InputStream from, byte[] AAD) throws IOException {
+    byte[] lengthBuffer = new byte[SIZE_LENGTH];
+    int gotBytes = 0;
+
+    // Read the length of encrypted Thrift structure
+    while (gotBytes < SIZE_LENGTH) {
+      int n = from.read(lengthBuffer, gotBytes, SIZE_LENGTH - gotBytes);
+      if (n <= 0) {
+        throw new IOException("Tried to read int (4 bytes), but only got " + gotBytes + " bytes.");
+      }
+      gotBytes += n;
+    }
+
+    final int ciphertextLength =
+        ((lengthBuffer[3] & 0xff) << 24) |
+        ((lengthBuffer[2] & 0xff) << 16) |
+        ((lengthBuffer[1] & 0xff) << 8)  |
+        ((lengthBuffer[0] & 0xff));
+
+    if (ciphertextLength < 1) {
+      throw new IOException("Wrong length of encrypted metadata: " + ciphertextLength);
+    }
+
+    byte[] ciphertextBuffer = new byte[ciphertextLength];
+    gotBytes = 0;
+    // Read the encrypted structure contents
+    while (gotBytes < ciphertextLength) {
+      int n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes);
+      if (n <= 0) {
+        throw new IOException("Tried to read " + ciphertextLength + " bytes, but only got " + gotBytes + " bytes.");
+      }
+      gotBytes += n;
+    }
+
+    // Decrypt the structure contents
+    return decrypt(ciphertextBuffer, 0, ciphertextLength, AAD);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
new file mode 100755
index 0000000..bdfed7f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.GCMParameterSpec;
+
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.format.BlockCipher;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+public class AesGcmEncryptor extends AesCipher implements BlockCipher.Encryptor{
+
+  AesGcmEncryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+    super(AesMode.GCM, keyBytes);
+
+    try {
+      cipher = Cipher.getInstance(AesMode.GCM.getCipherName());
+    } catch (GeneralSecurityException e) {
+      throw new IOException("Failed to create GCM cipher", e);
+    }
+  }
+
+  @Override
+  public byte[] encrypt(byte[] plainText, byte[] AAD)  throws IOException {
+    return encrypt(true, plainText, AAD);
+  }
+
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD)  throws IOException {
+    randomGenerator.nextBytes(localNonce);
+    return encrypt(writeLength, plainText, localNonce, AAD);
+  }
+
+  public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD)  
+      throws IOException {
+
+    if (nonce.length != NONCE_LENGTH) {
+      throw new IOException("Wrong nonce length " + nonce.length);
+    }
+    int plainTextLength = plainText.length;
+    int cipherTextLength = NONCE_LENGTH + plainTextLength + GCM_TAG_LENGTH;
+    int lengthBufferLength = writeLength? SIZE_LENGTH : 0;
+    byte[] cipherText = new byte[lengthBufferLength + cipherTextLength];
+    int inputLength = plainTextLength;
+    int inputOffset = 0;
+    int outputOffset = lengthBufferLength + NONCE_LENGTH;
+
+    try {
+      GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH_BITS, nonce);
+      cipher.init(Cipher.ENCRYPT_MODE, aesKey, spec);
+      if (null != AAD) cipher.updateAAD(AAD);
+
+      cipher.doFinal(plainText, inputOffset, inputLength, cipherText, outputOffset);
+    } catch (GeneralSecurityException e) {
+      throw new IOException("Failed to encrypt", e);
+    }
+
+    // Add ciphertext length
+    if (writeLength) {
+      System.arraycopy(BytesUtils.intToBytes(cipherTextLength), 0, cipherText, 0, lengthBufferLength);
+    }
+    // Add the nonce
+    System.arraycopy(nonce, 0, cipherText, lengthBufferLength, NONCE_LENGTH);
+
+    return cipherText;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesMode.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesMode.java
new file mode 100755
index 0000000..6f3c8a0
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesMode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+public enum AesMode {
+  GCM("AES/GCM/NoPadding"),
+  CTR("AES/CTR/NoPadding");
+
+  private final String cipherName;
+
+  private AesMode(String cipherName) {
+    this.cipherName = cipherName;
+  }
+
+  public String getCipherName() {
+    return cipherName;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnDecryptionProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnDecryptionProperties.java
new file mode 100755
index 0000000..b4cf98f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnDecryptionProperties.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * This class is only required for setting explicit column decryption keys -
+ * to override key retriever (or to provide keys when key metadata and/or
+ * key retriever are not available)
+ */
+public class ColumnDecryptionProperties {
+
+  private final ColumnPath columnPath;
+  private final byte[] keyBytes;
+
+  private ColumnDecryptionProperties(ColumnPath columnPath, byte[] keyBytes) {
+    if (null == columnPath) {
+      throw new IllegalArgumentException("Null column path");
+    }
+    if (null == keyBytes) {
+      throw new IllegalArgumentException("Null key for column " + columnPath);
+    }
+    if (!(keyBytes.length == 16 || keyBytes.length == 24 || keyBytes.length == 32)) {
+      throw new IllegalArgumentException("Wrong key length: " + keyBytes.length + 
+          " on column: " + columnPath);
+    }
+
+    this.columnPath = columnPath;
+    this.keyBytes = keyBytes;
+  }
+
+  /**
+   * Convenience builder for regular (not nested) columns.
+   * 
+   * @param name Flat column name
+   * @return Builder
+   */
+  public static Builder builder(String name) {
+    return builder(ColumnPath.get(name));
+  }
+
+  public static Builder builder(ColumnPath path) {
+    return new Builder(path);
+  }
+
+  public static class Builder {
+    private final ColumnPath columnPath;
+    private byte[] keyBytes;
+
+    private Builder(ColumnPath path) {
+      this.columnPath = path;
+    }
+
+    /**
+     * Set an explicit column key. 
+     * If applied on a file that contains key metadata for this column - 
+     * the metadata will be ignored, the column will be decrypted with this key.
+     * However, if the column was encrypted with the footer key, it will also be decrypted with the
+     * footer key, and the column key passed in this method will be ignored.
+     * 
+     * @param columnKey Key length must be either 16, 24 or 32 bytes.
+     * @return Builder
+     */
+    public Builder withKey(byte[] columnKey) {
+      if (null != this.keyBytes) {
+        throw new IllegalStateException("Key already set on column: " + columnPath);
+      }
+      this.keyBytes = new byte[columnKey.length];
+      System.arraycopy(columnKey, 0, this.keyBytes, 0, columnKey.length);
+
+      return this;
+    }
+
+    public ColumnDecryptionProperties build() {
+      return new ColumnDecryptionProperties(columnPath, keyBytes);
+    }
+  }
+
+  public ColumnPath getPath() {
+    return columnPath;
+  }
+
+  public byte[] getKeyBytes() {
+    return keyBytes;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
new file mode 100755
index 0000000..19419fc
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+public class ColumnEncryptionProperties {
+
+  private final boolean encrypted;
+  private final ColumnPath columnPath;
+  private final boolean encryptedWithFooterKey;
+  private final byte[] keyBytes;
+  private final byte[] keyMetaData;
+
+  private ColumnEncryptionProperties(boolean encrypted, ColumnPath columnPath, 
+      byte[] keyBytes, byte[] keyMetaData) {
+
+    if (null == columnPath) {
+      throw new IllegalArgumentException("Null column path");
+    }
+    if (!encrypted) {
+      if (null != keyBytes) {
+        throw new IllegalArgumentException("Setting key on unencrypted column: " + columnPath);
+      }
+      if (null != keyMetaData) {
+        throw new IllegalArgumentException("Setting key metadata on unencrypted column: " + columnPath);
+      }
+    }
+    if ((null != keyBytes) && 
+        !(keyBytes.length == 16 || keyBytes.length == 24 || keyBytes.length == 32)) {
+      throw new IllegalArgumentException("Wrong key length: " + keyBytes.length + 
+          ". Column: " + columnPath);
+    }
+    encryptedWithFooterKey = (encrypted && (null == keyBytes));
+    if (encryptedWithFooterKey && (null != keyMetaData)) {
+      throw new IllegalArgumentException("Setting key metadata on column encrypted with footer key:  " +
+          columnPath);
+    }
+
+    this.encrypted = encrypted;
+    this.columnPath = columnPath;
+    this.keyBytes = keyBytes;
+    this.keyMetaData = keyMetaData;
+  }
+
+  /**
+   * Convenience builder for regular (not nested) columns.
+   * To make sure column name is not misspelled or misplaced, 
+   * file writer will verify that column is in file schema.
+   *
+   * @param name Flat column name
+   * @return Builder
+   */
+  public static Builder builder(String name) {
+    return builder(ColumnPath.get(name), true);
+  }
+
+  /**
+   * Builder for encrypted columns.
+   * To make sure column path is not misspelled or misplaced, 
+   * file writer will verify this column is in file schema.
+   * 
+   * @param path Column path
+   * @return Builder
+   */
+  public static Builder builder(ColumnPath path) {
+    return builder(path, true);
+  }
+
+  static Builder builder(ColumnPath path, boolean encrypt) {
+    return new Builder(path, encrypt);
+  }
+
+  public static class Builder {
+    private final boolean encrypted;
+    private final ColumnPath columnPath;
+
+    private byte[] keyBytes;
+    private byte[] keyMetaData;
+
+    private Builder(ColumnPath path, boolean encrypted) {
+      this.encrypted = encrypted;
+      this.columnPath = path;
+    }
+
+    /**
+     * Set a column-specific key.
+     * If key is not set on an encrypted column, the column will
+     * be encrypted with the footer key.
+     * 
+     * @param columnKey Key length must be either 16, 24 or 32 bytes.
+     * @return Builder
+     */
+    public Builder withKey(byte[] columnKey) {
+      if (null == columnKey) {
+        return this;
+      }
+      if (null != this.keyBytes) {
+        throw new IllegalStateException("Key already set on column: " + columnPath);
+      }
+      this.keyBytes = new byte[columnKey.length];
+      System.arraycopy(columnKey, 0, this.keyBytes, 0, columnKey.length);
+
+      return this;
+    }
+
+    /**
+     * Set a key retrieval metadata.
+     * use either withKeyMetaData or withKeyID, not both.
+     * 
+     * @param keyMetaData arbitrary byte array with encryption key metadata
+     * @return Builder
+     */
+    public Builder withKeyMetaData(byte[] keyMetaData) {
+      if (null == keyMetaData) {
+        return this;
+      }
+      if (null != this.keyMetaData) {
+        throw new IllegalStateException("Key metadata already set on column: " + columnPath);
+      }
+      this.keyMetaData = keyMetaData;
+
+      return this;
+    }
+
+    /**
+     * Set a key retrieval metadata (converted from String).
+     * use either withKeyMetaData or withKeyID, not both.
+     * 
+     * @param keyId will be converted to metadata (UTF-8 array).
+     * @return Builder
+     */
+    public Builder withKeyID(String keyId) {
+      if (null == keyId) {
+        return this;
+      }
+      byte[] metaData = keyId.getBytes(StandardCharsets.UTF_8);
+
+      return withKeyMetaData(metaData);
+    }
+
+    public ColumnEncryptionProperties build() {
+      return new ColumnEncryptionProperties(encrypted, columnPath, keyBytes, keyMetaData);
+    }
+  }
+
+  public ColumnPath getPath() {
+    return columnPath;
+  }
+
+  public boolean isEncrypted() {
+    return encrypted;
+  }
+
+  public byte[] getKeyBytes() {
+    return keyBytes;
+  }
+
+  public boolean isEncryptedWithFooterKey() {
+    if (!encrypted) return false;
+    return encryptedWithFooterKey;
+  }
+
+  public byte[] getKeyMetaData() {
+    return keyMetaData;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
new file mode 100755
index 0000000..f134bb8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.io.IOException;
+
+/**
+ * Interface for classes retrieving encryption keys using the key metadata.
+ * Implementations must be thread-safe, if same KeyRetriever object is passed to multiple file readers.
+ */
+public interface DecryptionKeyRetriever {
+
+  /**
+   * Returns encryption key using the key metadata.
+   * If your key retrieval code throws runtime exceptions related to access control (permission) problems
+   * (such as Hadoop AccessControlException), catch them and throw the KeyAccessDeniedException.
+   * 
+   * @param keyMetaData arbitrary byte array with encryption key metadata
+   * @return encryption key. Key length can be either 16, 24 or 32 bytes.
+   * @throws KeyAccessDeniedException thrown upon access control problems (authentication or authorization)
+   * @throws IOException thrown upon key retrieval problems unrelated to access control
+   */
+  public byte[] getKey(byte[] keyMetaData) throws KeyAccessDeniedException, IOException;
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileDecryptionProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileDecryptionProperties.java
new file mode 100755
index 0000000..37bfa0d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileDecryptionProperties.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+public class FileDecryptionProperties {
+
+  private static final boolean CHECK_SIGNATURE = true;
+  private static final boolean ALLOW_PLAINTEXT_FILES = false;
+
+  private final byte[] footerKey;
+  private final DecryptionKeyRetriever keyRetriever;
+  private final byte[] aadPrefix;
+  private final AADPrefixVerifier aadPrefixVerifier;
+  private final Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap;
+  private final boolean checkPlaintextFooterIntegrity;
+  private final boolean allowPlaintextFiles;
+
+  private FileDecryptionProperties(byte[] footerKey, DecryptionKeyRetriever keyRetriever,
+      boolean checkPlaintextFooterIntegrity,  byte[] aadPrefix, AADPrefixVerifier aadPrefixVerifier,
+      Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap, boolean allowPlaintextFiles) {
+
+    if ((null == footerKey) && (null == keyRetriever) && (null == columnPropertyMap)) {
+      throw new IllegalArgumentException("No decryption properties are specified");
+    }
+    if ((null != footerKey) && 
+        !(footerKey.length == 16 || footerKey.length == 24 || footerKey.length == 32)) {
+      throw new IllegalArgumentException("Wrong footer key length " + footerKey.length);
+    }
+    if ((null == footerKey) && checkPlaintextFooterIntegrity && (null == keyRetriever)) {
+      throw new IllegalArgumentException("Can't check footer integrity with null footer key and null key retriever");
+    }
+
+    this.footerKey = footerKey;
+    this.checkPlaintextFooterIntegrity = checkPlaintextFooterIntegrity;
+    this.keyRetriever = keyRetriever;
+    this.aadPrefix = aadPrefix;
+    this.columnPropertyMap = columnPropertyMap;
+    this.aadPrefixVerifier = aadPrefixVerifier;
+    this.allowPlaintextFiles = allowPlaintextFiles;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private byte[] footerKeyBytes;
+    private DecryptionKeyRetriever keyRetriever;
+    private byte[] aadPrefixBytes;
+    private AADPrefixVerifier aadPrefixVerifier;
+    private Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap;
+    private boolean checkPlaintextFooterIntegrity;
+    private boolean plaintextFilesAllowed;
+
+    private Builder() {
+      this.checkPlaintextFooterIntegrity = CHECK_SIGNATURE;
+      this.plaintextFilesAllowed = ALLOW_PLAINTEXT_FILES;
+    }
+
+    /**
+     * Set an explicit footer key. If applied on a file that contains footer key metadata - 
+     * the metadata will be ignored, the footer will be decrypted/verified with this key.
+     * If explicit key is not set, footer key will be fetched from key retriever.
+     * 
+     * @param footerKey Key length must be either 16, 24 or 32 bytes.
+     * @return Builder 
+     */
+    public Builder withFooterKey(byte[] footerKey) {
+      if (null == footerKey) {
+        return this;
+      }
+      if (null != this.footerKeyBytes) {
+        throw new IllegalStateException("Footer key already set");
+      }
+      this.footerKeyBytes = new byte[footerKey.length];
+      System.arraycopy(footerKey, 0, this.footerKeyBytes, 0, footerKey.length);
+
+      return this;
+    }
+
+    /**
+     * Set explicit column keys (decryption properties).
+     * Its also possible to set a key retriever on this file decryption properties object. 
+     * Upon reading, availability of explicit keys is checked before invocation of the retriever callback.
+     * If an explicit key is available for a footer or a column, its key metadata will be ignored.
+     * 
+     * @param columnProperties Explicit column decryption keys
+     * @return Builder
+     */
+    public Builder withColumnKeys(Map<ColumnPath, ColumnDecryptionProperties> columnProperties) {
+      if (null == columnProperties) {
+        return this;
+      }
+      if (null != this.columnPropertyMap) {
+        throw new IllegalStateException("Column properties already set");
+      }
+      // Copy the map to make column properties immutable
+      this.columnPropertyMap = new HashMap<ColumnPath, ColumnDecryptionProperties>(columnProperties);
+
+      return this;
+    }
+
+    /**
+     * Set a key retriever callback. It is also possible to
+     * set explicit footer or column keys on this file property object. Upon file decryption, 
+     * availability of explicit keys is checked before invocation of the retriever callback.
+     * If an explicit key is available for a footer or a column, its key metadata will
+     * be ignored. 
+     * 
+     * @param keyRetriever Key retriever object
+     * @return Builder
+     */
+    public Builder withKeyRetriever(DecryptionKeyRetriever keyRetriever) {
+      if (null == keyRetriever) {
+        return this;
+      }
+      if (null != this.keyRetriever) {
+        throw new IllegalStateException("Key retriever already set");
+      }
+      this.keyRetriever = keyRetriever;
+
+      return this;
+    }
+
+    /**
+     * Skip integrity verification of plaintext footers.
+     * If not called, integrity of plaintext footers will be checked in runtime, and an exception will 
+     * be thrown in the following situations:
+     * - footer signing key is not available (not passed, or not found by key retriever)
+     * - footer content doesn't match the signature
+     * 
+     * @return Builder
+     */
+    public Builder withoutFooterSignatureVerification() {
+      this.checkPlaintextFooterIntegrity = false;
+      return this;
+    }
+
+    /**
+     * Explicitly supply the file AAD prefix.
+     * A must when a prefix is used for file encryption, but not stored in file.
+     * If AAD prefix is stored in file, it will be compared to the explicitly supplied value 
+     * and an exception will be thrown if they differ.
+     * 
+     * @param aadPrefixBytes AAD Prefix
+     * @return Builder
+     */
+    public Builder withAADPrefix(byte[] aadPrefixBytes) {
+      if (null == aadPrefixBytes) {
+        return this;
+      }
+      if (null != this.aadPrefixBytes) {
+        throw new IllegalStateException("AAD Prefix already set");
+      }
+      this.aadPrefixBytes = aadPrefixBytes;
+
+      return this;
+    }
+
+    /**
+     * Set callback for verification of AAD Prefixes stored in file.
+     * 
+     * @param aadPrefixVerifier AAD prefix verification object
+     * @return Builder
+     */
+    public Builder withAADPrefixVerifier(AADPrefixVerifier aadPrefixVerifier) {
+      if (null == aadPrefixVerifier) {
+        return this;
+      }
+      if (null != this.aadPrefixVerifier) {
+        throw new IllegalStateException("AAD Prefix verifier already set");
+      }
+      this.aadPrefixVerifier = aadPrefixVerifier;
+
+      return this;
+    }
+
+    /**
+     * By default, reading plaintext (unencrypted) files is not allowed when using a decryptor 
+     * - in order to detect files that were not encrypted by mistake. 
+     * However, the default behavior can be overriden by calling this method.
+     * The caller should use then a different method to ensure encryption of files with sensitive data.
+     * 
+     * @return Builder
+     */
+    public Builder withPlaintextFilesAllowed() {
+      this.plaintextFilesAllowed  = true;
+      return this;
+    }
+
+    public FileDecryptionProperties build() {
+      return new FileDecryptionProperties(footerKeyBytes, keyRetriever, checkPlaintextFooterIntegrity, 
+          aadPrefixBytes, aadPrefixVerifier, columnPropertyMap, plaintextFilesAllowed);
+    }
+  }
+
+  public byte[] getFooterKey() {
+    return footerKey;
+  }
+
+  public byte[] getColumnKey(ColumnPath path) {
+    if (null == columnPropertyMap) {
+      return null;
+    }
+    ColumnDecryptionProperties columnDecryptionProperties = columnPropertyMap.get(path);
+    if (null == columnDecryptionProperties) {
+      return null;
+    }
+
+    return columnDecryptionProperties.getKeyBytes();
+  }
+
+  public DecryptionKeyRetriever getKeyRetriever() {
+    return keyRetriever;
+  }
+
+  public byte[] getAADPrefix() {
+    return aadPrefix;
+  }
+
+  public boolean checkFooterIntegrity() {
+    return checkPlaintextFooterIntegrity;
+  }
+
+  boolean plaintextFilesAllowed() {
+    return allowPlaintextFiles;
+  }
+
+  AADPrefixVerifier getAADPrefixVerifier() {
+    return aadPrefixVerifier;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileEncryptionProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileEncryptionProperties.java
new file mode 100755
index 0000000..96a570a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileEncryptionProperties.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.format.EncryptionAlgorithm;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.crypto.AesCipher.AAD_FILE_UNIQUE_LENGTH;
+
+public class FileEncryptionProperties {
+
+  private static final ParquetCipher ALGORITHM_DEFAULT = ParquetCipher.AES_GCM_V1;
+  private static final boolean ENCRYPTED_FOOTER_DEFAULT = true;
+
+  private final EncryptionAlgorithm algorithm;
+  private final boolean encryptedFooter;
+  private final byte[] footerKey;
+  private final byte[] footerKeyMetadata;
+  private final byte[] fileAAD;
+  private final Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap;
+
+  private FileEncryptionProperties(ParquetCipher cipher, 
+      byte[] footerKey, byte[] footerKeyMetadata, boolean encryptedFooter,
+      byte[] aadPrefix, boolean storeAadPrefixInFile,
+      Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap) {
+
+    if (null == footerKey) {
+      throw new IllegalArgumentException("Footer key is null");
+    }
+    if (! (footerKey.length == 16 || footerKey.length == 24 || footerKey.length == 32)) {
+      throw new IllegalArgumentException("Wrong footer key length " + footerKey.length);
+    }
+    if (null != columnPropertyMap && columnPropertyMap.size() == 0) {
+      throw new IllegalArgumentException("No encrypted columns");
+    }
+
+    SecureRandom random = new SecureRandom();
+    byte[] aadFileUnique = new byte[AAD_FILE_UNIQUE_LENGTH];
+    random.nextBytes(aadFileUnique);
+
+    boolean supplyAadPrefix = false;
+    if (null == aadPrefix) {
+      this.fileAAD = aadFileUnique;
+    } else {
+      this.fileAAD = AesCipher.concatByteArrays(aadPrefix, aadFileUnique);
+      if (!storeAadPrefixInFile) supplyAadPrefix = true;
+    }
+
+    this.algorithm = cipher.getEncryptionAlgorithm();
+
+    if (algorithm.isSetAES_GCM_V1()) {
+      algorithm.getAES_GCM_V1().setAad_file_unique(aadFileUnique);
+      algorithm.getAES_GCM_V1().setSupply_aad_prefix(supplyAadPrefix);
+      if (null != aadPrefix && storeAadPrefixInFile) {
+        algorithm.getAES_GCM_V1().setAad_prefix(aadPrefix);
+      }
+    } else {
+      algorithm.getAES_GCM_CTR_V1().setAad_file_unique(aadFileUnique);
+      algorithm.getAES_GCM_CTR_V1().setSupply_aad_prefix(supplyAadPrefix);
+      if (null != aadPrefix && storeAadPrefixInFile) {
+        algorithm.getAES_GCM_CTR_V1().setAad_prefix(aadPrefix);
+      }
+    }
+
+    this.footerKey = footerKey;
+    this.footerKeyMetadata = footerKeyMetadata;
+    this.encryptedFooter = encryptedFooter;
+    this.columnPropertyMap = columnPropertyMap;
+  }
+
+  /**
+   * 
+   * @param footerKey Encryption key for file footer and some (or all) columns. 
+   * Key length must be either 16, 24 or 32 bytes.
+   * If null, footer won't be encrypted. At least one column must be encrypted then.
+   * @return Builder
+   */
+  public static Builder builder(byte[] footerKey) {
+    return new Builder(footerKey);
+  }
+
+  public static class Builder {
+    private byte[] footerKeyBytes;
+    private boolean encryptedFooter;
+    private ParquetCipher parquetCipher;
+    private byte[] footerKeyMetadata;
+    private byte[] aadPrefix;
+    private Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap;
+    private boolean storeAadPrefixInFile;
+
+    private Builder(byte[] footerKey) {
+      this.parquetCipher = ALGORITHM_DEFAULT;
+      this.encryptedFooter = ENCRYPTED_FOOTER_DEFAULT;
+      this.footerKeyBytes = new byte[footerKey.length];
+      System.arraycopy(footerKey, 0, this.footerKeyBytes, 0, footerKey.length);
+    }
+
+    /**
+     * Create files with plaintext footer.
+     * If not called, the files will be created with encrypted footer (default).
+     * 
+     * @return Builder
+     */
+    public Builder withPlaintextFooter() {
+      this.encryptedFooter = false;
+      return this;
+    }
+
+    /**
+     * Set encryption algorithm.
+     * If not called, files will be encrypted with AES_GCM_V1 (default).
+     * 
+     * @param parquetCipher Encryption algorithm
+     * @return Builder
+     */
+    public Builder withAlgorithm(ParquetCipher parquetCipher) {
+      this.parquetCipher = parquetCipher;
+      return this;
+    }
+
+    /**
+     * Set a key retrieval metadata (converted from String).
+     * Use either withFooterKeyMetaData or withFooterKeyID, not both.
+     * 
+     * @param keyID will be converted to metadata (UTF-8 array).
+     * @return Builder
+     */
+    public Builder withFooterKeyID(String keyID) {
+      if (null == keyID) {
+        return this;
+      }
+
+      return withFooterKeyMetadata(keyID.getBytes(StandardCharsets.UTF_8));
+    }
+
+    /**
+     * Set a key retrieval metadata.
+     * Use either withFooterKeyMetaData or withFooterKeyID, not both.
+     * 
+     * @param footerKeyMetadata Key metadata
+     * @return Builder
+     */
+    public Builder withFooterKeyMetadata(byte[] footerKeyMetadata) {
+      if (null == footerKeyMetadata) {
+        return this;
+      }
+      if (null != this.footerKeyMetadata) {
+        throw new IllegalStateException("Footer key metadata already set");
+      }
+      this.footerKeyMetadata = footerKeyMetadata;
+
+      return this;
+    }
+
+    /**
+     * Set the file AAD Prefix.
+     * 
+     * @param aadPrefixBytes AAD Prefix
+     * @return Builder
+     */
+    public Builder withAADPrefix(byte[] aadPrefixBytes) {
+      if (null == aadPrefixBytes) {
+        return this;
+      }
+      if (null != this.aadPrefix) {
+        throw new IllegalStateException("AAD Prefix already set");
+      }
+      this.aadPrefix = aadPrefixBytes;
+      this.storeAadPrefixInFile = true;
+
+      return this;
+    }
+
+    /**
+     * Skip storing AAD Prefix in file metadata.
+     * If not called, and if AAD Prefix is set, it will be stored.
+     * 
+     * @return Builder
+     */
+    public Builder withoutAADPrefixStorage() {
+      if (null == this.aadPrefix) {
+        throw new IllegalStateException("AAD Prefix not yet set");
+      }
+      this.storeAadPrefixInFile = false;
+
+      return this;
+    }
+
+    /**
+     * Set the list of encrypted columns and their properties (keys etc).
+     * If not called, all columns will be encrypted with the footer key. 
+     * If called, the file columns not in the list will be left unencrypted.
+     * 
+     * @param encryptedColumns Columns to be encrypted
+     * @return Builder
+     */
+    public Builder withEncryptedColumns(Map<ColumnPath, ColumnEncryptionProperties> encryptedColumns)  {
+      if (null == encryptedColumns) {
+        return this;
+      }
+      if (null != this.columnPropertyMap) {
+        throw new IllegalStateException("Column properties already set");
+      }
+      // Copy the map to make column properties immutable
+      this.columnPropertyMap = new HashMap<ColumnPath, ColumnEncryptionProperties>(encryptedColumns);
+
+      return this;
+    }
+
+    public FileEncryptionProperties build() {
+      return new FileEncryptionProperties(parquetCipher, 
+          footerKeyBytes, footerKeyMetadata, encryptedFooter,
+          aadPrefix, storeAadPrefixInFile, 
+          columnPropertyMap);
+    }
+  }
+
+  public EncryptionAlgorithm getAlgorithm() {
+    return algorithm;
+  }
+
+  public byte[] getFooterKey() {
+    return footerKey;
+  }
+
+  public byte[] getFooterKeyMetadata() {
+    return footerKeyMetadata;
+  }
+
+  public Map<ColumnPath, ColumnEncryptionProperties> getEncryptedColumns() {
+    return columnPropertyMap;
+  }
+
+  public ColumnEncryptionProperties getColumnProperties(ColumnPath columnPath) {
+    if (null == columnPropertyMap) {
+      // encrypted, with footer key
+      return ColumnEncryptionProperties.builder(columnPath, true).build();
+    } else {
+      ColumnEncryptionProperties columnProperties = columnPropertyMap.get(columnPath);
+      if (null != columnProperties) {
+        return columnProperties;
+      } else {
+        // plaintext column
+        return ColumnEncryptionProperties.builder(columnPath, false).build();
+      }
+    }
+  }
+
+  public byte[] getFileAAD() {
+    return fileAAD;
+  }
+
+  public boolean encryptedFooter() {
+    return encryptedFooter;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java
new file mode 100755
index 0000000..e4c3dca
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * Reader doesn't have key for encrypted column,
+ * but tries to access its contents
+ */
+public class HiddenColumnException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public HiddenColumnException(String string) {
+    super(string);
+  }
+
+  public HiddenColumnException(String string, Exception e) {
+    super(string, e);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
new file mode 100755
index 0000000..769c8d2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+public class InternalColumnDecryptionSetup {
+
+  private final ColumnPath columnPath;
+  private final boolean isEncrypted;
+  private final boolean isEncryptedWithFooterKey;
+  private final BlockCipher.Decryptor dataDecryptor;
+  private final BlockCipher.Decryptor metaDataDecryptor;
+  private final short columnOrdinal;
+  private final byte[] keyMetadata;
+
+  InternalColumnDecryptionSetup(ColumnPath path, boolean encrypted, 
+      boolean isEncryptedWithFooterKey, BlockCipher.Decryptor dataDecryptor, 
+      BlockCipher.Decryptor metaDataDecryptor, short columnOrdinal, byte[] keyMetadata) {
+    this.columnPath = path;
+    this.isEncrypted = encrypted;
+    this.isEncryptedWithFooterKey = isEncryptedWithFooterKey;
+    this.dataDecryptor = dataDecryptor;
+    this.metaDataDecryptor = metaDataDecryptor;
+    this.columnOrdinal = columnOrdinal;
+    this.keyMetadata = keyMetadata;
+  }
+
+  public boolean isEncrypted() {
+    return isEncrypted;
+  }
+
+  public BlockCipher.Decryptor getDataDecryptor() {
+    return dataDecryptor;
+  }
+
+  public BlockCipher.Decryptor getMetaDataDecryptor() {
+    return metaDataDecryptor;
+  }
+
+  boolean isEncryptedWithFooterKey() {
+    return isEncryptedWithFooterKey;
+  }
+
+  ColumnPath getPath() {
+    return columnPath;
+  }
+
+  public short getOrdinal() {
+    return columnOrdinal;
+  }
+
+  byte[] getKeyMetadata() {
+    return keyMetadata;
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
new file mode 100755
index 0000000..d4d2058
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.util.Arrays;
+
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.ColumnCryptoMetaData;
+import org.apache.parquet.format.EncryptionWithColumnKey;
+import org.apache.parquet.format.EncryptionWithFooterKey;
+
+public class InternalColumnEncryptionSetup {
+
+  private final ColumnEncryptionProperties encryptionProperties;
+  private final BlockCipher.Encryptor metadataEncryptor;
+  private final BlockCipher.Encryptor dataEncryptor;
+  private final ColumnCryptoMetaData columnCryptoMetaData;
+  private final short ordinal;
+
+  InternalColumnEncryptionSetup(ColumnEncryptionProperties encryptionProperties, short ordinal,
+      BlockCipher.Encryptor dataEncryptor, BlockCipher.Encryptor metaDataEncryptor) {
+    this.encryptionProperties = encryptionProperties;
+    this.dataEncryptor = dataEncryptor;
+    this.metadataEncryptor = metaDataEncryptor;
+    this.ordinal = ordinal;
+
+    if (encryptionProperties.isEncrypted()) {
+      if (encryptionProperties.isEncryptedWithFooterKey()) {
+        columnCryptoMetaData = ColumnCryptoMetaData.ENCRYPTION_WITH_FOOTER_KEY(new EncryptionWithFooterKey());
+      } else {
+        EncryptionWithColumnKey withColumnKeyStruct = new EncryptionWithColumnKey(Arrays.asList(encryptionProperties.getPath().toArray()));
+        if (null != encryptionProperties.getKeyMetaData()) {
+          withColumnKeyStruct.setKey_metadata(encryptionProperties.getKeyMetaData());
+        }
+        columnCryptoMetaData =  ColumnCryptoMetaData.ENCRYPTION_WITH_COLUMN_KEY(withColumnKeyStruct);
+      }
+    } else {
+      columnCryptoMetaData = null;
+    }
+  }
+
+  public boolean isEncrypted() {
+    return encryptionProperties.isEncrypted();
+  }
+
+  public BlockCipher.Encryptor getMetaDataEncryptor() {
+    return metadataEncryptor;
+  }
+
+  public BlockCipher.Encryptor getDataEncryptor() {
+    return dataEncryptor;
+  }
+
+  public ColumnCryptoMetaData getColumnCryptoMetaData() {
+    return columnCryptoMetaData;
+  }
+
+  public short getOrdinal() {
+    return ordinal;
+  }
+
+  public boolean isEncryptedWithFooterKey() {
+    return encryptionProperties.isEncryptedWithFooterKey();
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
new file mode 100755
index 0000000..d104401
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.EncryptionAlgorithm;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+
+public class InternalFileDecryptor {
+
+  private final FileDecryptionProperties fileDecryptionProperties;
+  private final DecryptionKeyRetriever keyRetriever;
+  private final boolean checkPlaintextFooterIntegrity;
+  private final byte[] aadPrefixInProperties;
+  private final AADPrefixVerifier aadPrefixVerifier;
+
+  private byte[] footerKey;
+  private HashMap<ColumnPath, InternalColumnDecryptionSetup> columnMap;
+  private EncryptionAlgorithm algorithm;
+  private byte[] fileAAD;
+  private boolean encryptedFooter;
+  private byte[] footerKeyMetaData;
+  private boolean fileCryptoMetaDataProcessed = false;
+  private BlockCipher.Decryptor aesGcmDecryptorWithFooterKey;
+  private BlockCipher.Decryptor aesCtrDecryptorWithFooterKey;
+  private boolean plaintextFile;
+
+  public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) throws IOException {
+    this.fileDecryptionProperties= fileDecryptionProperties;
+    checkPlaintextFooterIntegrity = fileDecryptionProperties.checkFooterIntegrity();
+    footerKey = fileDecryptionProperties.getFooterKey();
+    keyRetriever = fileDecryptionProperties.getKeyRetriever();
+    aadPrefixInProperties = fileDecryptionProperties.getAADPrefix();
+    columnMap = new HashMap<ColumnPath, InternalColumnDecryptionSetup>();
+    this.aadPrefixVerifier = fileDecryptionProperties.getAADPrefixVerifier();
+    this.plaintextFile = false;
+  }
+
+  private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) throws IOException {
+    if (null == columnKey) { // Decryptor with footer key
+      if (null == aesGcmDecryptorWithFooterKey) {
+        aesGcmDecryptorWithFooterKey = ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);
+      }
+      return aesGcmDecryptorWithFooterKey;
+    } else { // Decryptor with column key
+      return ModuleCipherFactory.getDecryptor(AesMode.GCM, columnKey);
+    }
+  }
+
+  private BlockCipher.Decryptor getDataModuleDecryptor(byte[] columnKey) throws IOException {
+    if (algorithm.isSetAES_GCM_V1()) {
+      return getThriftModuleDecryptor(columnKey);
+    }
+
+    // AES_GCM_CTR_V1
+    if (null == columnKey) { // Decryptor with footer key
+      if (null == aesCtrDecryptorWithFooterKey) {
+        aesCtrDecryptorWithFooterKey = ModuleCipherFactory.getDecryptor(AesMode.CTR, footerKey);
+      }
+      return aesCtrDecryptorWithFooterKey;
+    } else { // Decryptor with column key
+      return ModuleCipherFactory.getDecryptor(AesMode.CTR, columnKey);
+    }
+  }
+
+  public InternalColumnDecryptionSetup getColumnSetup(ColumnPath path) throws IOException {
+    if (!fileCryptoMetaDataProcessed) {
+      throw new IOException("Haven't parsed the file crypto metadata yet");
+    }
+    InternalColumnDecryptionSetup columnDecryptionSetup = columnMap.get(path);
+    if (null == columnDecryptionSetup) {
+      throw new IOException("Failed to find decryption setup for column " + path);
+    }
+
+    return columnDecryptionSetup;
+  }
+
+  public BlockCipher.Decryptor getFooterDecryptor() throws IOException {
+    if (!fileCryptoMetaDataProcessed) {
+      throw new IOException("Haven't parsed the file crypto metadata yet");
+    }
+    if (!encryptedFooter) {
+      return null;
+    }
+
+    return getThriftModuleDecryptor(null);
+  }
+
+  public void setFileCryptoMetaData(EncryptionAlgorithm algorithm, 
+      boolean encryptedFooter, byte[] footerKeyMetaData) throws IOException {
+
+    // first use of the decryptor
+    if (!fileCryptoMetaDataProcessed) {
+      fileCryptoMetaDataProcessed = true;
+      this.encryptedFooter = encryptedFooter;
+      this.algorithm = algorithm;
+      this.footerKeyMetaData = footerKeyMetaData;
+
+      byte[] aadFileUnique;
+      boolean mustSupplyAadPrefix;
+      boolean fileHasAadPrefix = false;
+      byte[] aadPrefixInFile = null;
+
+      // Process encryption algorithm metadata
+      if (algorithm.isSetAES_GCM_V1()) {
+        if (algorithm.getAES_GCM_V1().isSetAad_prefix()) {
+          fileHasAadPrefix = true;
+          aadPrefixInFile = algorithm.getAES_GCM_V1().getAad_prefix();
+        }
+        mustSupplyAadPrefix = algorithm.getAES_GCM_V1().isSupply_aad_prefix();
+        aadFileUnique = algorithm.getAES_GCM_V1().getAad_file_unique();
+      } else if (algorithm.isSetAES_GCM_CTR_V1()) {
+        if (algorithm.getAES_GCM_CTR_V1().isSetAad_prefix()) {
+          fileHasAadPrefix = true;
+          aadPrefixInFile = algorithm.getAES_GCM_CTR_V1().getAad_prefix();
+        }
+        mustSupplyAadPrefix = algorithm.getAES_GCM_CTR_V1().isSupply_aad_prefix();
+        aadFileUnique = algorithm.getAES_GCM_CTR_V1().getAad_file_unique();
+      } else {
+        throw new IOException("Unsupported algorithm: " + algorithm);
+      }
+
+      // Handle AAD prefix
+      byte[] aadPrefix = aadPrefixInProperties;
+      if (mustSupplyAadPrefix && (null == aadPrefixInProperties)) {
+        throw new IOException("AAD prefix used for file encryption, but not stored in file and not supplied in decryption properties");
+      }
+
+      if (fileHasAadPrefix) {
+        if (null != aadPrefixInProperties) {
+          if (!Arrays.equals(aadPrefixInProperties, aadPrefixInFile)) {
+            throw new IOException("AAD Prefix in file and in decryption properties is not the same");
+          }
+        }
+        if (null != aadPrefixVerifier) {
+          aadPrefixVerifier.verify(aadPrefixInFile);
+        }
+        aadPrefix = aadPrefixInFile;
+      }
+      else {
+        if (!mustSupplyAadPrefix && (null != aadPrefixInProperties)) {
+          throw new IOException("AAD Prefix set in decryption properties, but was not used for file encryption");
+        }
+        if (null != aadPrefixVerifier) {
+          throw new IOException("AAD Prefix Verifier is set, but AAD Prefix not found in file");
+        }
+      }
+
+      if (null == aadPrefix) {
+        this.fileAAD = aadFileUnique;
+      } else {
+        this.fileAAD = AesCipher.concatByteArrays(aadPrefix, aadFileUnique);
+      }
+
+      // Get footer key
+      if (null == footerKey) { // ignore footer key metadata if footer key is explicitly set via API
+        if (encryptedFooter || checkPlaintextFooterIntegrity) {
+          if (null == footerKeyMetaData) {
+            throw new IOException("No footer key or key metadata");
+          }
+          if (null == keyRetriever) {
+            throw new IOException("No footer key or key retriever");
+          }
+
+          try {
+            footerKey = keyRetriever.getKey(footerKeyMetaData);
+          } catch (KeyAccessDeniedException e) {
+            throw new IOException("Footer key: access denied", e);
+          }
+
+          if (null == footerKey) {
+            throw new IOException("Footer key unavailable");
+          }
+        }
+      }
+    } else {
+      // re-use of the decryptor 
+      // check the crypto metadata.
+      if (!this.algorithm.equals(algorithm)) {
+        throw new IOException("Decryptor re-use: Different algorithm");
+      }
+      if (encryptedFooter != this.encryptedFooter) {
+        throw new IOException("Decryptor re-use: Different footer encryption");
+      }
+      if (!Arrays.equals(this.footerKeyMetaData, footerKeyMetaData)) {
+        throw new IOException("Decryptor re-use: Different footer key metadata ");
+      }
+    }
+  }
+
+  public InternalColumnDecryptionSetup setColumnCryptoMetadata(ColumnPath path, boolean encrypted, 
+      boolean encryptedWithFooterKey, byte[] keyMetadata, short columnOrdinal) throws IOException {
+
+    if (!fileCryptoMetaDataProcessed) {
+      throw new IOException("Haven't parsed the file crypto metadata yet");
+    }
+
+    InternalColumnDecryptionSetup columnDecryptionSetup = columnMap.get(path);
+    if (null != columnDecryptionSetup) {
+      if (columnDecryptionSetup.isEncrypted() != encrypted) {
+        throw new IOException("Re-use: wrong encrypted flag. Column: " + path);
+      }
+      if (encrypted) {
+        if (encryptedWithFooterKey != columnDecryptionSetup.isEncryptedWithFooterKey()) {
+          throw new IOException("Re-use: wrong encryption key (column vs footer). Column: " + path);
+        }
+        if (!encryptedWithFooterKey && !Arrays.equals(columnDecryptionSetup.getKeyMetadata(), keyMetadata)) {
+          throw new IOException("Decryptor re-use: Different footer key metadata ");
+        }
+      }
+      return columnDecryptionSetup;
+    }
+
+    if (!encrypted) {
+      columnDecryptionSetup = new InternalColumnDecryptionSetup(path, false,  false, null, null, columnOrdinal, null);
+    } else {
+      if (encryptedWithFooterKey) {
+        if (null == footerKey) {
+          throw new IOException("Column " + path + " is encrypted with NULL footer key");
+        }
+        columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, true, 
+            getDataModuleDecryptor(null), getThriftModuleDecryptor(null), columnOrdinal, null);
+      } else {
+        // Column is encrypted with column-specific key
+        byte[] columnKeyBytes = fileDecryptionProperties.getColumnKey(path);
+        if ((null == columnKeyBytes) && (null != keyMetadata) && (null != keyRetriever)) {
+          // No explicit column key given via API. Retrieve via key metadata.
+          try {
+            columnKeyBytes = keyRetriever.getKey(keyMetadata);
+          } catch (KeyAccessDeniedException e) {
+            throw new IOException("Column " + path + ": key access denied", e);
+          }
+        }
+
+        if (null == columnKeyBytes) { // Hidden column: encrypted, but key unavailable
+          throw new IOException("Column " + path + ": key unavailable");
+        } else { // Key is available
+          columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, false, 
+              getDataModuleDecryptor(columnKeyBytes), getThriftModuleDecryptor(columnKeyBytes), columnOrdinal, keyMetadata);
+        }
+      }
+    }
+    columnMap.put(path, columnDecryptionSetup);
+
+    return columnDecryptionSetup;
+  }
+
+  public byte[] getFileAAD() {
+    return this.fileAAD;
+  }
+
+  public AesGcmEncryptor getSignedFooterEncryptor() throws IOException  {
+    if (!fileCryptoMetaDataProcessed) {
+      throw new IOException("Haven't parsed the file crypto metadata yet");
+    }
+    if (encryptedFooter) {
+      throw new IOException("Requesting signed footer encryptor in file with encrypted footer");
+    }
+
+    return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
+  }
+
+  public boolean checkFooterIntegrity() {
+    return checkPlaintextFooterIntegrity;
+  }
+
+  public boolean plaintextFilesAllowed() {
+    return fileDecryptionProperties.plaintextFilesAllowed();
+  }
+
+  public void setPlaintextFile() {
+    plaintextFile = true;
+  }
+
+  public boolean plaintextFile() {
+    return plaintextFile;
+  }
+}
+
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
new file mode 100755
index 0000000..9d408ef
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.FileCryptoMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.EncryptionAlgorithm;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class InternalFileEncryptor {
+
+  private final EncryptionAlgorithm algorithm;
+  private final FileEncryptionProperties fileEncryptionProperties;
+  private final byte[] footerKey;
+  private final byte[] footerKeyMetadata;
+  private final HashMap<ColumnPath, InternalColumnEncryptionSetup> columnMap;
+  private final byte[] fileAAD;
+  private final boolean encryptFooter;
+
+  private BlockCipher.Encryptor aesGcmEncryptorWithFooterKey;
+  private BlockCipher.Encryptor aesCtrEncryptorWithFooterKey;
+  private boolean fileCryptoMetaDataCreated;
+
+  public InternalFileEncryptor(FileEncryptionProperties fileEncryptionProperties) throws IOException {
+    this.fileEncryptionProperties = fileEncryptionProperties;
+    algorithm = fileEncryptionProperties.getAlgorithm();
+    footerKey = fileEncryptionProperties.getFooterKey();
+    encryptFooter =  fileEncryptionProperties.encryptedFooter();
+    footerKeyMetadata = fileEncryptionProperties.getFooterKeyMetadata();
+    fileAAD = fileEncryptionProperties.getFileAAD();
+    columnMap = new HashMap<ColumnPath, InternalColumnEncryptionSetup>();
+    fileCryptoMetaDataCreated = false;
+  }
+
+  private BlockCipher.Encryptor getThriftModuleEncryptor(byte[] columnKey) throws IOException {
+    if (null == columnKey) { // Encryptor with footer key
+      if (null == aesGcmEncryptorWithFooterKey) {
+        aesGcmEncryptorWithFooterKey = ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
+      }
+      return aesGcmEncryptorWithFooterKey;
+    } else { // Encryptor with column key
+      return ModuleCipherFactory.getEncryptor(AesMode.GCM, columnKey);
+    }
+  }
+
+  private BlockCipher.Encryptor getDataModuleEncryptor(byte[] columnKey) throws IOException {
+    if (algorithm.isSetAES_GCM_V1()) {
+      return getThriftModuleEncryptor(columnKey);
+    }
+    // AES_GCM_CTR_V1
+    if (null == columnKey) { // Encryptor with footer key
+      if (null == aesCtrEncryptorWithFooterKey) {
+        aesCtrEncryptorWithFooterKey = ModuleCipherFactory.getEncryptor(AesMode.CTR, footerKey);
+      }
+      return aesCtrEncryptorWithFooterKey;
+    } else { // Encryptor with column key
+      return ModuleCipherFactory.getEncryptor(AesMode.CTR, columnKey);
+    }
+  }
+
+  public InternalColumnEncryptionSetup getColumnSetup(ColumnPath columnPath, 
+      boolean createIfNull, short ordinal) throws IOException {
+    InternalColumnEncryptionSetup internalColumnProperties = columnMap.get(columnPath);
+
+    if (null != internalColumnProperties) {
+      if (ordinal != internalColumnProperties.getOrdinal()) {
+        throw new IOException("Column ordinal doesnt match " + columnPath + 
+            ": " + ordinal + ", "+internalColumnProperties.getOrdinal());
+      }
+      return internalColumnProperties;
+    }
+
+    if (!createIfNull) {
+      throw new IOException("No encryption setup found for column " + columnPath);
+    }
+    if (fileCryptoMetaDataCreated) {
+      throw new IOException("Re-use: No encryption setup for column " + columnPath);
+    }
+
+    ColumnEncryptionProperties columnProperties = fileEncryptionProperties.getColumnProperties(columnPath);
+    if (null == columnProperties) {
+      throw new IOException("No encryption properties for column " + columnPath);
+    }
+    if (columnProperties.isEncrypted()) {
+      if (columnProperties.isEncryptedWithFooterKey()) {
+        internalColumnProperties = new InternalColumnEncryptionSetup(columnProperties, ordinal,
+            getDataModuleEncryptor(null), getThriftModuleEncryptor(null));
+      } else {
+        internalColumnProperties = new InternalColumnEncryptionSetup(columnProperties, ordinal,
+            getDataModuleEncryptor(columnProperties.getKeyBytes()), 
+            getThriftModuleEncryptor(columnProperties.getKeyBytes()));
+      }
+    } else { // unencrypted column
+      internalColumnProperties = new InternalColumnEncryptionSetup(columnProperties, ordinal, null, null);
+    }
+    columnMap.put(columnPath, internalColumnProperties);
+
+    return internalColumnProperties;
+  }
+
+  public BlockCipher.Encryptor getFooterEncryptor() throws IOException  {
+    if (!encryptFooter) return null;
+    return getThriftModuleEncryptor(null);
+  }
+
+  public FileCryptoMetaData getFileCryptoMetaData() throws IOException {
+    if (!encryptFooter) {
+      throw new IOException("Requesting FileCryptoMetaData in file with unencrypted footer");
+    }
+    FileCryptoMetaData fileCryptoMetaData = new FileCryptoMetaData(algorithm);
+    if (null != footerKeyMetadata) {
+      fileCryptoMetaData.setKey_metadata(footerKeyMetadata);
+    }
+    fileCryptoMetaDataCreated = true;
+
+    return fileCryptoMetaData;
+  }
+
+  public boolean encryptColumnMetaData(InternalColumnEncryptionSetup columnSetup) {
+    if (!columnSetup.isEncrypted()) {
+      return false;
+    }
+    if (!encryptFooter) {
+      return true;
+    }
+
+    return !columnSetup.isEncryptedWithFooterKey();
+  }
+
+  public boolean isFooterEncrypted() {
+    return encryptFooter;
+  }
+
+  public EncryptionAlgorithm getEncryptionAlgorithm() {
+    return algorithm;
+  }
+
+  public byte[] getFileAAD() {
+    return this.fileAAD;
+  }
+
+  public byte[] getFooterSigningKeyMetaData()  throws IOException {
+    if (encryptFooter) {
+      throw new IOException("Requesting signing footer key metadata in file with encrypted footer");
+    }
+    return footerKeyMetadata;
+  }
+
+  public AesGcmEncryptor getSignedFooterEncryptor() throws IOException  {
+    if (encryptFooter) {
+      throw new IOException("Requesting signed footer encryptor in file with encrypted footer");
+    }
+    return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
new file mode 100755
index 0000000..0d9d84f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.security.KeyException;
+
+public class KeyAccessDeniedException extends KeyException {
+  private static final long serialVersionUID = 1L;
+
+  public KeyAccessDeniedException(String keyID) {
+    super(keyID);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
new file mode 100755
index 0000000..bbde66e
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.format.BlockCipher;
+
+import java.io.IOException;
+
+public class ModuleCipherFactory {
+
+  // Parquet Module types
+  public enum ModuleType {
+    Footer((byte)0),
+    ColumnMetaData((byte)1),
+    DataPage((byte)2),
+    DictionaryPage((byte)3),
+    DataPageHeader((byte)4),
+    DictionaryPageHeader((byte)5),
+    ColumnIndex((byte)6),
+    OffsetIndex((byte)7);
+
+    private final byte value;
+
+    private ModuleType(byte value) {
+      this.value = value;
+    }
+
+    public byte getValue() {
+      return value;
+    }
+  }
+
+  public static final int SIZE_LENGTH = 4;
+
+  public static BlockCipher.Encryptor getEncryptor(AesMode mode, byte[] keyBytes) 
+      throws IllegalArgumentException, IOException {
+    switch (mode) {
+    case GCM:
+      return new AesGcmEncryptor(keyBytes);
+    case CTR:
+      return new AesCtrEncryptor(keyBytes);
+    default:
+      throw new IllegalArgumentException("AesMode not supported in ModuleCipherFactory: " + mode);
+    }
+  }
+
+  public static BlockCipher.Decryptor getDecryptor(AesMode mode, byte[] keyBytes) 
+      throws IllegalArgumentException, IOException {
+    switch (mode) {
+    case GCM:
+      return new AesGcmDecryptor(keyBytes);
+    case CTR:
+      return new AesCtrDecryptor(keyBytes);
+    default:
+      throw new IllegalArgumentException("AesMode not supported in ModuleCipherFactory: " + mode);
+    }
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCipher.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCipher.java
new file mode 100755
index 0000000..9a60402
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCipher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.format.AesGcmV1;
+import org.apache.parquet.format.AesGcmCtrV1;
+import org.apache.parquet.format.EncryptionAlgorithm;
+
+public enum ParquetCipher {
+
+  AES_GCM_V1 {
+    @Override
+    public EncryptionAlgorithm getEncryptionAlgorithm() {
+      return EncryptionAlgorithm.AES_GCM_V1(new AesGcmV1());
+    }
+  },
+  AES_GCM_CTR_V1 {
+    @Override
+    public EncryptionAlgorithm getEncryptionAlgorithm() {
+      return EncryptionAlgorithm.AES_GCM_CTR_V1(new AesGcmCtrV1());
+    }
+  };
+
+  public abstract EncryptionAlgorithm getEncryptionAlgorithm();
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java
new file mode 100755
index 0000000..7b15089
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Hashtable;
+
+// Simple key retriever, based on UTF8 strings as key identifiers
+public class StringKeyIdRetriever implements DecryptionKeyRetriever{
+
+  private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
+
+  public void putKey(String keyId, byte[] keyBytes) {
+    keyMap.put(keyId, keyBytes);
+  }
+
+  @Override
+  public byte[] getKey(byte[] keyMetaData) {
+    String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+    return keyMap.get(keyId);
+  }
+}