You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/06/10 11:38:18 UTC

[parquet-mr] branch encryption updated: PARQUET-1807: Encryption: Interop and Function test suite for Java version (#782)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/encryption by this push:
     new cc5e268  PARQUET-1807: Encryption: Interop and Function test suite for Java version (#782)
cc5e268 is described below

commit cc5e2689eeb20d36d5360e26dd68a3a96a557a19
Author: andersonm-ibm <63...@users.noreply.github.com>
AuthorDate: Wed Jun 10 14:38:08 2020 +0300

    PARQUET-1807: Encryption: Interop and Function test suite for Java version (#782)
    
    Add a test for writing and reading parquet in a number of encryption
    and decryption configurations.
    Add interop test that reads files from parquet-testing GitHub
    repository, that were written by parquet-cpp.
    This adds parquet-testing repo as a submodule.
---
 .gitmodules                                        |   3 +
 .../parquet/crypto/DecryptionKeyRetrieverMock.java |  41 ++
 .../apache/parquet/hadoop/TestBloomEncryption.java | 313 ---------
 .../apache/parquet/hadoop/TestBloomFiltering.java  | 114 ++-
 .../parquet/hadoop/TestColumnIndexEncryption.java  | 571 ---------------
 .../parquet/hadoop/TestColumnIndexFiltering.java   | 122 +++-
 .../parquet/hadoop/TestEncryptionOptions.java      | 779 +++++++++++++++++++++
 pom.xml                                            |  26 +
 submodules/parquet-testing                         |   1 +
 9 files changed, 1042 insertions(+), 928 deletions(-)

diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..2708799
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "submodules/parquet-testing"]
+	path = submodules/parquet-testing
+	url = https://github.com/apache/parquet-testing.git
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/DecryptionKeyRetrieverMock.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/DecryptionKeyRetrieverMock.java
new file mode 100644
index 0000000..0bb7fc5
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/DecryptionKeyRetrieverMock.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple key retriever, based on UTF8 strings as key identifiers
+ */
+public class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+  private final Map<String, byte[]> keyMap = new HashMap<>();
+
+  public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+    keyMap.put(keyId, keyBytes);
+    return this;
+  }
+
+  @Override
+  public byte[] getKey(byte[] keyMetaData) {
+    String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+    return keyMap.get(keyId);
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
deleted file mode 100644
index e4cc550..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.crypto.ColumnEncryptionProperties;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.TestColumnIndexEncryption.StringKeyIdRetriever;
-import org.apache.parquet.io.api.Binary;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestBloomEncryption {
-  private static final Path FILE_V1 = createTempFile();
-  private static final Path FILE_V2 = createTempFile();
-  private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomEncryption.class);
-  private static final Random RANDOM = new Random(42);
-  private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
-  private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
-  
-  private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
-  private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
-  private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
-
-  private final Path file;
-  public TestBloomEncryption(Path file) {
-    this.file = file;
-  }
-
-  private static Path createTempFile() {
-    try {
-      return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
-    } catch (IOException e) {
-      throw new AssertionError("Unable to create temporary file", e);
-    }
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
-  }
-
-  private static List<PhoneBookWriter.User> generateData(int rowCount) {
-    List<PhoneBookWriter.User> users = new ArrayList<>();
-    List<String> names = generateNames(rowCount);
-    for (int i = 0; i < rowCount; ++i) {
-      users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
-    }
-    return users;
-  }
-
-  private static List<String> generateNames(int rowCount) {
-    List<String> list = new ArrayList<>();
-
-    // Adding fix values for filtering
-    list.add("anderson");
-    list.add("anderson");
-    list.add("miller");
-    list.add("miller");
-    list.add("miller");
-    list.add("thomas");
-    list.add("thomas");
-    list.add("williams");
-
-    int nullCount = rowCount / 100;
-
-    String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
-    int maxLength = 8;
-    for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
-      int l = RANDOM.nextInt(maxLength);
-      StringBuilder builder = new StringBuilder(l);
-      for (int j = 0; j < l; ++j) {
-        builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
-      }
-      list.add(builder.toString());
-    }
-    list.sort((str1, str2) -> -str1.compareTo(str2));
-
-    // Adding nulls to random places
-    for (int i = 0; i < nullCount; ++i) {
-      list.add(RANDOM.nextInt(list.size()), null);
-    }
-
-    return list;
-  }
-
-  private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers() {
-    int length = RANDOM.nextInt(5) - 1;
-    if (length < 0) {
-      return null;
-    }
-    List<PhoneBookWriter.PhoneNumber> phoneNumbers = new ArrayList<>(length);
-    for (int i = 0; i < length; ++i) {
-      // 6 digits numbers
-      long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
-      phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
-    }
-    return phoneNumbers;
-  }
-
-  private static PhoneBookWriter.Location generateLocation(int id, int rowCount) {
-    if (RANDOM.nextDouble() < 0.01) {
-      return null;
-    }
-
-    if (RANDOM.nextDouble() < 0.001) {
-      return new PhoneBookWriter.Location(99.9, 99.9);
-    }
-
-    double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
-    double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
-
-    return new PhoneBookWriter.Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
-  }
-
-  private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
-                                               boolean useBloomFilter) throws IOException {
-    /*
-    byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
-    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
-        .withFooterKey(keyBytes)
-        .build();
-        */
-    
-    StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
-    kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
-    kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
-    kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
-
-    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
-        .withKeyRetriever(kr1)
-        .build();
-    
-    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
-      .withFilter(FilterCompat.get(filter))
-      .withDecryption(fileDecryptionProperties)
-      .useDictionaryFilter(useOtherFiltering)
-      .useStatsFilter(useOtherFiltering)
-      .useRecordFilter(useOtherFiltering)
-      .useBloomFilter(useBloomFilter)
-      .useColumnIndexFilter(useOtherFiltering));
-  }
-
-  // Assumes that both lists are in the same order
-  private static void assertContains(Stream<PhoneBookWriter.User> expected, List<PhoneBookWriter.User> actual) {
-    Iterator<PhoneBookWriter.User> expIt = expected.iterator();
-    if (!expIt.hasNext()) {
-      return;
-    }
-    PhoneBookWriter.User exp = expIt.next();
-    for (PhoneBookWriter.User act : actual) {
-      if (act.equals(exp)) {
-        if (!expIt.hasNext()) {
-          break;
-        }
-        exp = expIt.next();
-      }
-    }
-    assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
-  }
-
-  private void assertCorrectFiltering(Predicate<PhoneBookWriter.User> expectedFilter, FilterPredicate actualFilter)
-    throws IOException {
-    // Check with only bloom filter based filtering
-    List<PhoneBookWriter.User> result = readUsers(actualFilter, false, true);
-
-    assertTrue("Bloom filtering should drop some row groups", result.size() < DATA.size());
-    LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
-      100 * result.size() / DATA.size());
-    // Asserts that all the required records are in the result
-    assertContains(DATA.stream().filter(expectedFilter), result);
-    // Asserts that all the retrieved records are in the file (validating non-matching records)
-    assertContains(result.stream(), DATA);
-
-    // Check with all the filtering filtering to ensure the result contains exactly the required values
-    result = readUsers(actualFilter, true, false);
-    assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
-  }
-
-
-  @BeforeClass
-  public static void createFile() throws IOException {
-    int pageSize = DATA.size() / 100;     // Ensure that several pages will be created
-    int rowGroupSize = pageSize * 4;    // Ensure that there are more row-groups created
-/*    
-    byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
-    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(keyBytes)
-        .build();
-    */
-    // Encryption configuration 2: Encrypt two columns and the footer, with different keys.
-    ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
-        .builder("id")
-        .withKey(COLUMN_ENCRYPTION_KEY1)
-        .withKeyID("kc1")
-        .build();
-
-    ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
-        .builder("name")
-        .withKey(COLUMN_ENCRYPTION_KEY2)
-        .withKeyID("kc2")
-        .build();
-    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
-
-    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
-    columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
-
-    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
-        .withFooterKeyID("kf")
-        .withEncryptedColumns(columnPropertiesMap)
-        .build();
-    
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
-        .withBloomFilterNDV("location.lat", 10000L)
-        .withBloomFilterNDV("name", 10000L)
-        .withBloomFilterNDV("id", 10000L)
-        .withEncryption(encryptionProperties)
-        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
-      DATA);
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
-        .withBloomFilterNDV("location.lat", 10000L)
-        .withBloomFilterNDV("name", 10000L)
-        .withBloomFilterNDV("id", 10000L)
-        .withEncryption(encryptionProperties)
-        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
-      DATA);
-  }
-
-  @AfterClass
-  public static void deleteFile() throws IOException {
-    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
-    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
-  }
-
-
-  @Test
-  public void testSimpleFiltering() throws IOException {
-    assertCorrectFiltering(
-      record -> record.getId() == 1234L,
-      eq(longColumn("id"), 1234L));
-
-    assertCorrectFiltering(
-      record -> "miller".equals(record.getName()),
-      eq(binaryColumn("name"), Binary.fromString("miller")));
-  }
-
-  @Test
-  public void testNestedFiltering() throws IOException {
-    assertCorrectFiltering(
-      record -> {
-        PhoneBookWriter.Location location = record.getLocation();
-        return location != null && location.getLat() != null && location.getLat() == 99.9;
-      },
-      eq(doubleColumn("location.lat"), 99.9));
-  }
-}
-
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index 3a6a002..4ebe15a 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -22,11 +22,16 @@ package org.apache.parquet.hadoop;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.io.api.Binary;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -57,29 +62,46 @@ import static org.junit.Assert.*;
 
 @RunWith(Parameterized.class)
 public class TestBloomFiltering {
-  private static final Path FILE_V1 = createTempFile();
-  private static final Path FILE_V2 = createTempFile();
+  private static final Path FILE_V1 = createTempFile(false);
+  private static final Path FILE_V2 = createTempFile(false);
+  private static final Path FILE_V1_E = createTempFile(true);
+  private static final Path FILE_V2_E = createTempFile(true);
   private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomFiltering.class);
   private static final Random RANDOM = new Random(42);
   private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
   private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
 
+  private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY1 = "1234567890123450".getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY2 = "1234567890123451".getBytes();
+  private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
+  private static final String COLUMN_ENCRYPTION_KEY1_ID = "kc1";
+  private static final String COLUMN_ENCRYPTION_KEY2_ID = "kc2";
+
   private final Path file;
-  public TestBloomFiltering(Path file) {
+  private final boolean isEncrypted;
+
+  public TestBloomFiltering(Path file, boolean isEncrypted) {
     this.file = file;
+    this.isEncrypted = isEncrypted;
   }
 
-  private static Path createTempFile() {
+  private static Path createTempFile(boolean encrypted) {
+    String suffix = encrypted ? ".parquet.encrypted" : ".parquet";
     try {
-      return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
+      return new Path(Files.createTempFile("test-bloom-filter_", suffix).toAbsolutePath().toString());
     } catch (IOException e) {
       throw new AssertionError("Unable to create temporary file", e);
     }
   }
 
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name = "Run {index}: isEncrypted={1}")
   public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+    return Arrays.asList(
+      new Object[] { FILE_V1, false /*isEncrypted*/ },
+      new Object[] { FILE_V2, false /*isEncrypted*/ },
+      new Object[] { FILE_V1_E, true /*isEncrypted*/ },
+      new Object[] { FILE_V2_E, true /*isEncrypted*/ });
   }
 
   private static List<PhoneBookWriter.User> generateData(int rowCount) {
@@ -157,8 +179,21 @@ public class TestBloomFiltering {
 
   private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
                                                boolean useBloomFilter) throws IOException {
+    FileDecryptionProperties fileDecryptionProperties = null;
+    if (isEncrypted) {
+      DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+        .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+        .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
+        .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
+
+      fileDecryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(decryptionKeyRetrieverMock)
+        .build();
+    }
+
     return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
       .withFilter(FilterCompat.get(filter))
+      .withDecryption(fileDecryptionProperties)
       .useDictionaryFilter(useOtherFiltering)
       .useStatsFilter(useOtherFiltering)
       .useRecordFilter(useOtherFiltering)
@@ -202,35 +237,68 @@ public class TestBloomFiltering {
     assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
   }
 
+  private static FileEncryptionProperties getFileEncryptionProperties() {
+    ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+      .builder("id")
+      .withKey(COLUMN_ENCRYPTION_KEY1)
+      .withKeyID(COLUMN_ENCRYPTION_KEY1_ID)
+      .build();
+
+    ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+      .builder("name")
+      .withKey(COLUMN_ENCRYPTION_KEY2)
+      .withKeyID(COLUMN_ENCRYPTION_KEY2_ID)
+      .build();
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+      .withFooterKeyID(FOOTER_ENCRYPTION_KEY_ID)
+      .withEncryptedColumns(columnPropertiesMap)
+      .build();
+
+    return encryptionProperties;
+  }
 
-  @BeforeClass
-  public static void createFile() throws IOException {
+  private static void writePhoneBookToFile(Path file,
+                                           ParquetProperties.WriterVersion parquetVersion,
+                                           FileEncryptionProperties encryptionProperties) throws IOException {
     int pageSize = DATA.size() / 100;     // Ensure that several pages will be created
     int rowGroupSize = pageSize * 4;    // Ensure that there are more row-groups created
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+    PhoneBookWriter.write(ExampleParquetWriter.builder(file)
         .withWriteMode(OVERWRITE)
         .withRowGroupSize(rowGroupSize)
         .withPageSize(pageSize)
         .withBloomFilterNDV("location.lat", 10000L)
         .withBloomFilterNDV("name", 10000L)
         .withBloomFilterNDV("id", 10000L)
-        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
-      DATA);
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
-        .withBloomFilterNDV("location.lat", 10000L)
-        .withBloomFilterNDV("name", 10000L)
-        .withBloomFilterNDV("id", 10000L)
-        .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
+        .withEncryption(encryptionProperties)
+        .withWriterVersion(parquetVersion),
       DATA);
   }
 
+  private static void deleteFile(Path file) throws IOException {
+    file.getFileSystem(new Configuration()).delete(file, false);
+  }
+
+  @BeforeClass
+  public static void createFiles() throws IOException {
+    writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, null);
+    writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, null);
+
+    FileEncryptionProperties encryptionProperties = getFileEncryptionProperties();
+    writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
+    writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
+  }
+
   @AfterClass
-  public static void deleteFile() throws IOException {
-    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
-    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+  public static void deleteFiles() throws IOException {
+    deleteFile(FILE_V1);
+    deleteFile(FILE_V2);
+    deleteFile(FILE_V1_E);
+    deleteFile(FILE_V2_E);
   }
 
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
deleted file mode 100644
index 2fd2207..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop;
-
-import static java.util.Collections.emptyList;
-import static java.util.stream.Collectors.toList;
-import static org.apache.parquet.filter2.predicate.FilterApi.and;
-import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.eq;
-import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.lt;
-import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.not;
-import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.or;
-import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
-import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
-import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static org.apache.parquet.schema.Types.optional;
-import static org.apache.parquet.schema.Types.required;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.crypto.ColumnEncryptionProperties;
-import org.apache.parquet.crypto.DecryptionKeyRetriever;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.FilterCompat.Filter;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.predicate.Statistics;
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Types;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit tests for high level column index based filtering.
- */
-@RunWith(Parameterized.class)
-public class TestColumnIndexEncryption {
-  private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexEncryption.class);
-  private static final Random RANDOM = new Random(42);
-  private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
-  private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
-  private static final Path FILE_V1 = createTempFile();
-  private static final Path FILE_V2 = createTempFile();
-  private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
-      .required(INT64).named("id")
-      .optionalGroup()
-        .addField(optional(DOUBLE).named("lon"))
-        .addField(optional(DOUBLE).named("lat"))
-        .named("location")
-      .optionalGroup()
-        .repeatedGroup()
-          .addField(required(INT64).named("number"))
-          .addField(optional(BINARY).as(stringType()).named("kind"))
-          .named("phone")
-        .named("phoneNumbers")
-      .named("user_without_name");
-  private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
-  private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
-  private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
-  
-//Simple key retriever, based on UTF8 strings as key identifiers
- static class StringKeyIdRetriever implements DecryptionKeyRetriever{
-
-   private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
-
-   public void putKey(String keyId, byte[] keyBytes) {
-     keyMap.put(keyId, keyBytes);
-   }
-
-   @Override
-   public byte[] getKey(byte[] keyMetaData) {
-     String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
-     return keyMap.get(keyId);
-   }
- }
-
-
-  @Parameters
-  public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
-  }
-
-  private final Path file;
-
-  public TestColumnIndexEncryption(Path file) {
-    this.file = file;
-  }
-
-  private static List<User> generateData(int rowCount) {
-    List<User> users = new ArrayList<>();
-    List<String> names = generateNames(rowCount);
-    for (int i = 0; i < rowCount; ++i) {
-      users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
-    }
-    return users;
-  }
-
-  private static List<String> generateNames(int rowCount) {
-    List<String> list = new ArrayList<>();
-
-    // Adding fix values for filtering
-    list.add("anderson");
-    list.add("anderson");
-    list.add("miller");
-    list.add("miller");
-    list.add("miller");
-    list.add("thomas");
-    list.add("thomas");
-    list.add("williams");
-
-    int nullCount = rowCount / 100;
-
-    String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
-    int maxLength = 8;
-    for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
-      int l = RANDOM.nextInt(maxLength);
-      StringBuilder builder = new StringBuilder(l);
-      for (int j = 0; j < l; ++j) {
-        builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
-      }
-      list.add(builder.toString());
-    }
-    Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
-
-    // Adding nulls to random places
-    for (int i = 0; i < nullCount; ++i) {
-      list.add(RANDOM.nextInt(list.size()), null);
-    }
-
-    return list;
-  }
-
-  private static List<PhoneNumber> generatePhoneNumbers() {
-    int length = RANDOM.nextInt(5) - 1;
-    if (length < 0) {
-      return null;
-    }
-    List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
-    for (int i = 0; i < length; ++i) {
-      // 6 digits numbers
-      long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
-      phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
-    }
-    return phoneNumbers;
-  }
-
-  private static Location generateLocation(int id, int rowCount) {
-    if (RANDOM.nextDouble() < 0.01) {
-      return null;
-    }
-
-    double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
-    double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
-
-    return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
-  }
-
-  private static Path createTempFile() {
-    try {
-      return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
-    } catch (IOException e) {
-      throw new AssertionError("Unable to create temporary file", e);
-    }
-  }
-
-  private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
-    return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
-  }
-
-  private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
-      throws IOException {
-    return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
-  }
-
-  private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
-    return readUsers(filter, useOtherFiltering, true);
-  }
-
-  private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
-      throws IOException {
-    
-    StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
-    kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
-    kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
-    kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
-
-    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
-        .withKeyRetriever(kr1)
-        .build();
-    
-    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
-        .withFilter(filter)
-        .withDecryption(fileDecryptionProperties)
-        .useDictionaryFilter(useOtherFiltering)
-        .useStatsFilter(useOtherFiltering)
-        .useRecordFilter(useOtherFiltering)
-        .useColumnIndexFilter(useColumnIndexFilter));
-  }
-
-  private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {    
-    StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
-    kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
-    kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
-    kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
-
-    FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
-        .withKeyRetriever(kr1)
-        .build();
-    
-    return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
-        .withFilter(filter)
-        .withDecryption(fileDecryptionProperties)
-        .useDictionaryFilter(useOtherFiltering)
-        .useStatsFilter(useOtherFiltering)
-        .useRecordFilter(useOtherFiltering)
-        .useColumnIndexFilter(useColumnIndexFilter)
-        .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
-  }
-
-  // Assumes that both lists are in the same order
-  private static void assertContains(Stream<User> expected, List<User> actual) {
-    Iterator<User> expIt = expected.iterator();
-    if (!expIt.hasNext()) {
-      return;
-    }
-    User exp = expIt.next();
-    for (User act : actual) {
-      if (act.equals(exp)) {
-        if (!expIt.hasNext()) {
-          break;
-        }
-        exp = expIt.next();
-      }
-    }
-    assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
-  }
-
-  private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
-      throws IOException {
-    // Check with only column index based filtering
-    List<User> result = readUsers(actualFilter, false);
-
-    assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());
-    LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
-        100 * result.size() / DATA.size());
-    // Asserts that all the required records are in the result
-    assertContains(DATA.stream().filter(expectedFilter), result);
-    // Asserts that all the retrieved records are in the file (validating non-matching records)
-    assertContains(result.stream(), DATA);
-
-    // Check with all the filtering filtering to ensure the result contains exactly the required values
-    result = readUsers(actualFilter, true);
-    assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
-  }
-
-  @BeforeClass
-  public static void createFile() throws IOException {
-    int pageSize = DATA.size() / 10;     // Ensure that several pages will be created
-    int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
-    
-    // Encryption configuration: Encrypt two columns and the footer, with different keys.
-    ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
-        .builder("id")
-        .withKey(COLUMN_ENCRYPTION_KEY1)
-        .withKeyID("kc1")
-        .build();
-
-    ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
-        .builder("name")
-        .withKey(COLUMN_ENCRYPTION_KEY2)
-        .withKeyID("kc2")
-        .build();
-    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
-
-    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
-    columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
-
-    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
-        .withFooterKeyID("kf")
-        .withEncryptedColumns(columnPropertiesMap)
-        .build();
-    
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
-        .withEncryption(encryptionProperties)
-        .withWriterVersion(WriterVersion.PARQUET_1_0),
-        DATA);
-    
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
-        .withEncryption(encryptionProperties)
-        .withWriterVersion(WriterVersion.PARQUET_2_0),
-        DATA);
-  }
-
-  @AfterClass
-  public static void deleteFile() throws IOException {
-    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
-    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
-  }
-
-  @Test
-  public void testSimpleFiltering() throws IOException {
-    assertCorrectFiltering(
-        record -> record.getId() == 1234,
-        eq(longColumn("id"), 1234l));
-    assertCorrectFiltering(
-        record -> "miller".equals(record.getName()),
-        eq(binaryColumn("name"), Binary.fromString("miller")));
-    assertCorrectFiltering(
-        record -> record.getName() == null,
-        eq(binaryColumn("name"), null));
-  }
-
-  @Test
-  public void testNoFiltering() throws IOException {
-    // Column index filtering with no-op filter
-    assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
-    assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
-
-    // Column index filtering turned off
-    assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
-        readUsers(eq(longColumn("id"), 1234l), true, false));
-    assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()),
-        readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
-    assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()),
-        readUsers(eq(binaryColumn("name"), null), true, false));
-
-    // Every filtering mechanism turned off
-    assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
-    assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
-    assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
-  }
-
-  @Test
-  public void testComplexFiltering() throws IOException {
-    assertCorrectFiltering(
-        record -> {
-          Location loc = record.getLocation();
-          Double lat = loc == null ? null : loc.getLat();
-          Double lon = loc == null ? null : loc.getLon();
-          return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
-        },
-        and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)),
-            and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
-    assertCorrectFiltering(
-        record -> {
-          Location loc = record.getLocation();
-          return loc == null || (loc.getLat() == null && loc.getLon() == null);
-        },
-        and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null)));
-    assertCorrectFiltering(
-        record -> {
-          String name = record.getName();
-          return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
-        },
-        and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
-  }
-
-  public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
-    private static final Binary A = Binary.fromString("a");
-    private static final Binary B = Binary.fromString("b");
-    private static final Binary E = Binary.fromString("e");
-    private static final Binary F = Binary.fromString("f");
-    private static final Binary I = Binary.fromString("i");
-    private static final Binary J = Binary.fromString("j");
-    private static final Binary O = Binary.fromString("o");
-    private static final Binary P = Binary.fromString("p");
-    private static final Binary U = Binary.fromString("u");
-    private static final Binary V = Binary.fromString("v");
-
-    private static boolean isStartingWithVowel(String str) {
-      if (str == null || str.isEmpty()) {
-        return false;
-      }
-      switch (str.charAt(0)) {
-        case 'a':
-        case 'e':
-        case 'i':
-        case 'o':
-        case 'u':
-          return true;
-        default:
-          return false;
-      }
-    }
-
-    @Override
-    public boolean keep(Binary value) {
-      return value != null && isStartingWithVowel(value.toStringUsingUTF8());
-    }
-
-    @Override
-    public boolean canDrop(Statistics<Binary> statistics) {
-      Comparator<Binary> cmp = statistics.getComparator();
-      Binary min = statistics.getMin();
-      Binary max = statistics.getMax();
-      return cmp.compare(max, A) < 0
-          || (cmp.compare(min, B) >= 0 && cmp.compare(max, E) < 0)
-          || (cmp.compare(min, F) >= 0 && cmp.compare(max, I) < 0)
-          || (cmp.compare(min, J) >= 0 && cmp.compare(max, O) < 0)
-          || (cmp.compare(min, P) >= 0 && cmp.compare(max, U) < 0)
-          || cmp.compare(min, V) >= 0;
-    }
-
-    @Override
-    public boolean inverseCanDrop(Statistics<Binary> statistics) {
-      Comparator<Binary> cmp = statistics.getComparator();
-      Binary min = statistics.getMin();
-      Binary max = statistics.getMax();
-      return (cmp.compare(min, A) >= 0 && cmp.compare(max, B) < 0)
-          || (cmp.compare(min, E) >= 0 && cmp.compare(max, F) < 0)
-          || (cmp.compare(min, I) >= 0 && cmp.compare(max, J) < 0)
-          || (cmp.compare(min, O) >= 0 && cmp.compare(max, P) < 0)
-          || (cmp.compare(min, U) >= 0 && cmp.compare(max, V) < 0);
-    }
-  }
-
-  public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
-    private long divisor;
-
-    IsDivisibleBy(long divisor) {
-      this.divisor = divisor;
-    }
-
-    @Override
-    public boolean keep(Long value) {
-      // Deliberately not checking for null to verify the handling of NPE
-      // Implementors shall always checks the value for null and return accordingly
-      return value % divisor == 0;
-    }
-
-    @Override
-    public boolean canDrop(Statistics<Long> statistics) {
-      long min = statistics.getMin();
-      long max = statistics.getMax();
-      return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
-    }
-
-    @Override
-    public boolean inverseCanDrop(Statistics<Long> statistics) {
-      long min = statistics.getMin();
-      long max = statistics.getMax();
-      return min == max && min % divisor == 0;
-    }
-  }
-
-  @Test
-  public void testUDF() throws IOException {
-    assertCorrectFiltering(
-        record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
-        or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
-            userDefined(longColumn("id"), new IsDivisibleBy(234))));
-    assertCorrectFiltering(
-        record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
-            not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
-                userDefined(longColumn("id"), new IsDivisibleBy(234)))));
-  }
-
-  @Test
-  public void testFilteringWithMissingColumns() throws IOException {
-    // Missing column filter is always true
-    assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
-    assertCorrectFiltering(
-        record -> record.getId() == 1234,
-        and(eq(longColumn("id"), 1234l),
-            eq(longColumn("not-existing-long"), null)));
-    assertCorrectFiltering(
-        record -> "miller".equals(record.getName()),
-        and(eq(binaryColumn("name"), Binary.fromString("miller")),
-            invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
-
-    // Missing column filter is always false
-    assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
-    assertCorrectFiltering(
-        record -> "miller".equals(record.getName()),
-        or(eq(binaryColumn("name"), Binary.fromString("miller")),
-            gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
-    assertCorrectFiltering(
-        record -> record.getId() == 1234,
-        or(eq(longColumn("id"), 1234l),
-            userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
-  }
-
-  @Test
-  public void testFilteringWithProjection() throws IOException {
-    // All rows shall be retrieved because all values in column 'name' shall be handled as null values
-    assertEquals(
-        DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()),
-        readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
-
-    // Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
-    assertEquals(
-        emptyList(),
-        readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
-    assertEquals(
-        emptyList(),
-        readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
-            SCHEMA_WITHOUT_NAME, false, true));
-  }
-}
-
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index c18212e..d2a5395 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -52,8 +52,10 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -61,7 +63,12 @@ import java.util.stream.Stream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
@@ -74,6 +81,7 @@ import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
@@ -95,8 +103,10 @@ public class TestColumnIndexFiltering {
   private static final Random RANDOM = new Random(42);
   private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
   private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
-  private static final Path FILE_V1 = createTempFile();
-  private static final Path FILE_V2 = createTempFile();
+  private static final Path FILE_V1 = createTempFile(false);
+  private static final Path FILE_V2 = createTempFile(false);
+  private static final Path FILE_V1_E = createTempFile(true);
+  private static final Path FILE_V2_E = createTempFile(true);
   private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
       .required(INT64).named("id")
       .optionalGroup()
@@ -111,15 +121,28 @@ public class TestColumnIndexFiltering {
         .named("phoneNumbers")
       .named("user_without_name");
 
-  @Parameters
+  private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY1 = "1234567890123450".getBytes();
+  private static final byte[] COLUMN_ENCRYPTION_KEY2 = "1234567890123451".getBytes();
+  private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
+  private static final String COLUMN_ENCRYPTION_KEY1_ID = "kc1";
+  private static final String COLUMN_ENCRYPTION_KEY2_ID = "kc2";
+
+  @Parameters(name = "Run {index}: isEncrypted={1}")
   public static Collection<Object[]> params() {
-    return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+    return Arrays.asList(
+      new Object[] { FILE_V1, false /*isEncrypted*/ },
+      new Object[] { FILE_V2, false /*isEncrypted*/ },
+      new Object[] { FILE_V1_E, true /*isEncrypted*/ },
+      new Object[] { FILE_V2_E, true /*isEncrypted*/ });
   }
 
   private final Path file;
+  private final boolean isEncrypted;
 
-  public TestColumnIndexFiltering(Path file) {
+  public TestColumnIndexFiltering(Path file, boolean isEncrypted) {
     this.file = file;
+    this.isEncrypted = isEncrypted;
   }
 
   private static List<User> generateData(int rowCount) {
@@ -191,9 +214,10 @@ public class TestColumnIndexFiltering {
     return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
   }
 
-  private static Path createTempFile() {
+  private static Path createTempFile(boolean encrypted) {
+    String suffix = encrypted ? ".parquet.encrypted" : ".parquet";
     try {
-      return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+      return new Path(Files.createTempFile("test-ci_", suffix).toAbsolutePath().toString());
     } catch (IOException e) {
       throw new AssertionError("Unable to create temporary file", e);
     }
@@ -214,17 +238,22 @@ public class TestColumnIndexFiltering {
 
   private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
       throws IOException {
+    FileDecryptionProperties decryptionProperties = getFileDecryptionProperties();
     return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
         .withFilter(filter)
+        .withDecryption(decryptionProperties)
         .useDictionaryFilter(useOtherFiltering)
         .useStatsFilter(useOtherFiltering)
         .useRecordFilter(useOtherFiltering)
         .useColumnIndexFilter(useColumnIndexFilter));
   }
 
-  private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {
+  private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering,
+                                             boolean useColumnIndexFilter) throws IOException {
+    FileDecryptionProperties decryptionProperties = getFileDecryptionProperties();
     return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
         .withFilter(filter)
+        .withDecryption(decryptionProperties)
         .useDictionaryFilter(useOtherFiltering)
         .useStatsFilter(useOtherFiltering)
         .useRecordFilter(useOtherFiltering)
@@ -232,6 +261,21 @@ public class TestColumnIndexFiltering {
         .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
   }
 
+  private FileDecryptionProperties getFileDecryptionProperties() {
+    FileDecryptionProperties decryptionProperties = null;
+    if (isEncrypted) {
+      DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+        .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+        .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
+        .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
+
+      decryptionProperties = FileDecryptionProperties.builder()
+        .withKeyRetriever(decryptionKeyRetrieverMock)
+        .build();
+    }
+    return decryptionProperties;
+  }
+
   // Assumes that both lists are in the same order
   private static void assertContains(Stream<User> expected, List<User> actual) {
     Iterator<User> expIt = expected.iterator();
@@ -269,27 +313,63 @@ public class TestColumnIndexFiltering {
   }
 
   @BeforeClass
-  public static void createFile() throws IOException {
+  public static void createFiles() throws IOException {
+    writePhoneBookToFile(FILE_V1, WriterVersion.PARQUET_1_0, null);
+    writePhoneBookToFile(FILE_V2, WriterVersion.PARQUET_2_0, null);
+    FileEncryptionProperties encryptionProperties = getFileEncryptionProperties();
+    writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
+    writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
+  }
+
+  private static void writePhoneBookToFile(Path file, WriterVersion parquetVersion,
+                                           FileEncryptionProperties encryptionProperties) throws IOException {
     int pageSize = DATA.size() / 10;     // Ensure that several pages will be created
     int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
-        .withWriterVersion(WriterVersion.PARQUET_1_0),
-        DATA);
-    PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+
+    PhoneBookWriter.write(ExampleParquetWriter.builder(file)
         .withWriteMode(OVERWRITE)
         .withRowGroupSize(rowGroupSize)
         .withPageSize(pageSize)
-        .withWriterVersion(WriterVersion.PARQUET_2_0),
-        DATA);
+        .withEncryption(encryptionProperties)
+        .withWriterVersion(parquetVersion),
+      DATA);
+  }
+
+  private static FileEncryptionProperties getFileEncryptionProperties() {
+    ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+      .builder("id")
+      .withKey(COLUMN_ENCRYPTION_KEY1)
+      .withKeyID(COLUMN_ENCRYPTION_KEY1_ID)
+      .build();
+
+    ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+      .builder("name")
+      .withKey(COLUMN_ENCRYPTION_KEY2)
+      .withKeyID(COLUMN_ENCRYPTION_KEY2_ID)
+      .build();
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+    columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+    columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+    FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+      .withFooterKeyID(FOOTER_ENCRYPTION_KEY_ID)
+      .withEncryptedColumns(columnPropertiesMap)
+      .build();
+
+    return encryptionProperties;
+  }
+
+  private static void deleteFile(Path file) throws IOException {
+    file.getFileSystem(new Configuration()).delete(file, false);
   }
 
   @AfterClass
-  public static void deleteFile() throws IOException {
-    FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
-    FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+  public static void deleteFiles() throws IOException {
+    deleteFile(FILE_V1);
+    deleteFile(FILE_V2);
+    deleteFile(FILE_V1_E);
+    deleteFile(FILE_V2_E);
   }
 
   @Test
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
new file mode 100644
index 0000000..7f0111d
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
@@ -0,0 +1,779 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnDecryptionProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.parquet.statistics.RandomValues;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files in different
+ * encryption and decryption configurations. The samples have the following goals:
+ * 1) Demonstrate usage of different options for data encryption and decryption.
+ * 2) Produce encrypted files for interoperability tests with other (eg parquet-cpp)
+ *    readers that support encryption.
+ * 3) Produce encrypted files with plaintext footer, for testing the ability of legacy
+ *    readers to parse the footer and read unencrypted columns.
+ * 4) Perform interoperability tests with other (eg parquet-cpp) writers, by reading
+ *    encrypted files produced by these writers.
+ *
+ * The write sample produces number of parquet files, each encrypted with a different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * tester<encryption config number>.parquet.encrypted.
+ *
+ * The read sample creates a set of decryption configurations and then uses each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with seven columns in the following
+ * encryption configurations:
+ *
+ *  UNIFORM_ENCRYPTION:             Encrypt all columns and the footer with the same key.
+ *                                  (uniform encryption)
+ *  ENCRYPT_COLUMNS_AND_FOOTER:     Encrypt six columns and the footer, with different
+ *                                  keys.
+ *  ENCRYPT_COLUMNS_PLAINTEXT_FOOTER: Encrypt six columns, with different keys.
+ *                                  Do not encrypt footer (to enable legacy readers)
+ *                                  - plaintext footer mode.
+ *  ENCRYPT_COLUMNS_AND_FOOTER_AAD: Encrypt six columns and the footer, with different
+ *                                  keys. Supply aad_prefix for file identity
+ *                                  verification.
+ *  ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE:   Encrypt six columns and the footer,
+ *                                  with different keys. Supply aad_prefix, and call
+ *                                  disable_aad_prefix_storage to prevent file
+ *                                  identity storage in file metadata.
+ *  ENCRYPT_COLUMNS_AND_FOOTER_CTR: Encrypt six columns and the footer, with different
+ *                                  keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
+ *  NO_ENCRYPTION:                  Do not encrypt anything
+ *
+ *
+ * The read sample uses each of the following decryption configurations to read every
+ * encrypted files in the input directory:
+ *
+ *  DECRYPT_WITH_KEY_RETRIEVER:     Decrypt using key retriever that holds the keys of
+ *                                  the encrypted columns and the footer key.
+ *  DECRYPT_WITH_KEY_RETRIEVER_AAD: Decrypt using key retriever that holds the keys of
+ *                                  the encrypted columns and the footer key. Supplies
+ *                                  aad_prefix to verify file identity.
+ *  DECRYPT_WITH_EXPLICIT_KEYS:     Decrypt using explicit column and footer keys
+ *                                  (instead of key retrieval callback).
+ *  NO_DECRYPTION:                  Do not decrypt anything.
+ */
+public class TestEncryptionOptions {
+  private static final Logger LOG = LoggerFactory.getLogger(TestEncryptionOptions.class);
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule
+  public ErrorCollector errorCollector = new ErrorCollector();
+
+  private static String PARQUET_TESTING_PATH = "../submodules/parquet-testing/data";
+  private static final int RANDOM_SEED = 42;
+  private static final int FIXED_LENGTH = 10;
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final RandomValues.IntGenerator intGenerator
+    = new RandomValues.IntGenerator(RANDOM_SEED);
+  private static final RandomValues.FloatGenerator floatGenerator
+    = new RandomValues.FloatGenerator(RANDOM_SEED);
+  private static final RandomValues.DoubleGenerator doubleGenerator
+    = new RandomValues.DoubleGenerator(RANDOM_SEED);
+  private static final RandomValues.BinaryGenerator binaryGenerator
+    = new RandomValues.BinaryGenerator(RANDOM_SEED);
+  private static final RandomValues.FixedGenerator fixedBinaryGenerator
+    = new RandomValues.FixedGenerator(RANDOM_SEED, FIXED_LENGTH);
+
+  private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
+  private static final byte[][] COLUMN_ENCRYPTION_KEYS = { "1234567890123450".getBytes(),
+    "1234567890123451".getBytes(), "1234567890123452".getBytes(), "1234567890123453".getBytes(),
+    "1234567890123454".getBytes(), "1234567890123455".getBytes()};
+  private static final String[] COLUMN_ENCRYPTION_KEY_IDS = { "kc1", "kc2", "kc3", "kc4", "kc5", "kc6"};
+  private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
+  private static final String AAD_PREFIX_STRING = "tester";
+  private static final String BOOLEAN_FIELD_NAME = "boolean_field";
+  private static final String INT32_FIELD_NAME = "int32_field";
+  private static final String FLOAT_FIELD_NAME = "float_field";
+  private static final String DOUBLE_FIELD_NAME = "double_field";
+  private static final String BINARY_FIELD_NAME = "ba_field";
+  private static final String FIXED_LENGTH_BINARY_FIELD_NAME = "flba_field";
+  private static final String PLAINTEXT_INT32_FIELD_NAME = "plain_int32_field";
+
+  private static final byte[] footerKeyMetadata = FOOTER_ENCRYPTION_KEY_ID.getBytes(StandardCharsets.UTF_8);
+  private static final byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
+
+  private static final int ROW_COUNT = 10000;
+  private static final List<SingleRow> DATA = Collections.unmodifiableList(generateRandomData(ROW_COUNT));
+  private static final List<SingleRow> LINEAR_DATA = Collections.unmodifiableList(generateLinearData(250));
+
+  private static final MessageType SCHEMA =
+    new MessageType("schema",
+      new PrimitiveType(REQUIRED, BOOLEAN, BOOLEAN_FIELD_NAME),
+      new PrimitiveType(REQUIRED, INT32, INT32_FIELD_NAME),
+      new PrimitiveType(REQUIRED, FLOAT, FLOAT_FIELD_NAME),
+      new PrimitiveType(REQUIRED, DOUBLE, DOUBLE_FIELD_NAME),
+      new PrimitiveType(OPTIONAL, BINARY, BINARY_FIELD_NAME),
+      Types.required(FIXED_LEN_BYTE_ARRAY).length(FIXED_LENGTH).named(FIXED_LENGTH_BINARY_FIELD_NAME),
+      new PrimitiveType(OPTIONAL, INT32, PLAINTEXT_INT32_FIELD_NAME));
+
+  private static final DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+    .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+    .putKey(COLUMN_ENCRYPTION_KEY_IDS[0], COLUMN_ENCRYPTION_KEYS[0])
+    .putKey(COLUMN_ENCRYPTION_KEY_IDS[1], COLUMN_ENCRYPTION_KEYS[1])
+    .putKey(COLUMN_ENCRYPTION_KEY_IDS[2], COLUMN_ENCRYPTION_KEYS[2])
+    .putKey(COLUMN_ENCRYPTION_KEY_IDS[3], COLUMN_ENCRYPTION_KEYS[3])
+    .putKey(COLUMN_ENCRYPTION_KEY_IDS[4], COLUMN_ENCRYPTION_KEYS[4])
+    .putKey(COLUMN_ENCRYPTION_KEY_IDS[5], COLUMN_ENCRYPTION_KEYS[5]);
+
+  public enum EncryptionConfiguration {
+    UNIFORM_ENCRYPTION {
+      /**
+       * Encryption configuration 1: Encrypt all columns and the footer with the same key.
+       */
+      public FileEncryptionProperties getEncryptionProperties() {
+        return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+          .withFooterKeyMetadata(footerKeyMetadata).build();
+      }
+    },
+    ENCRYPT_COLUMNS_AND_FOOTER {
+      /**
+       * Encryption configuration 2: Encrypt six columns and the footer, with different keys.
+       */
+      public FileEncryptionProperties getEncryptionProperties() {
+        Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+        return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+          .withFooterKeyMetadata(footerKeyMetadata)
+          .withEncryptedColumns(columnPropertiesMap)
+          .build();
+      }
+    },
+    ENCRYPT_COLUMNS_PLAINTEXT_FOOTER {
+      /**
+       * Encryption configuration 3: Encrypt six columns, with different keys.
+       * Don't encrypt footer.
+       * (plaintext footer mode, readable by legacy readers)
+       */
+      public FileEncryptionProperties getEncryptionProperties() {
+        Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+        return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+          .withFooterKeyMetadata(footerKeyMetadata)
+          .withEncryptedColumns(columnPropertiesMap)
+          .withPlaintextFooter()
+          .build();
+      }
+    },
+    ENCRYPT_COLUMNS_AND_FOOTER_AAD {
+      /**
+       * Encryption configuration 4: Encrypt six columns and the footer, with different keys.
+       * Use aad_prefix.
+       */
+      public FileEncryptionProperties getEncryptionProperties() {
+        Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+        return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+          .withFooterKeyMetadata(footerKeyMetadata)
+          .withEncryptedColumns(columnPropertiesMap)
+          .withAADPrefix(AADPrefix)
+          .build();
+      }
+    },
+    ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE {
+      /**
+       * Encryption configuration 5: Encrypt six columns and the footer, with different keys.
+       * Use aad_prefix and disable_aad_prefix_storage.
+       */
+      public FileEncryptionProperties getEncryptionProperties() {
+        Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+        return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+          .withFooterKeyMetadata(footerKeyMetadata)
+          .withEncryptedColumns(columnPropertiesMap)
+          .withAADPrefix(AADPrefix)
+          .withoutAADPrefixStorage()
+          .build();
+      }
+    },
+    ENCRYPT_COLUMNS_AND_FOOTER_CTR {
+      /**
+       *  Encryption configuration 6: Encrypt six columns and the footer, with different keys.
+       *  Use AES_GCM_CTR_V1 algorithm.
+       */
+      public FileEncryptionProperties getEncryptionProperties() {
+        Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+        return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+          .withFooterKeyMetadata(footerKeyMetadata)
+          .withEncryptedColumns(columnPropertiesMap)
+          .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+          .build();
+      }
+    },
+    NO_ENCRYPTION {
+      public FileEncryptionProperties getEncryptionProperties() {
+        return null;
+      }
+    };
+
+    abstract public FileEncryptionProperties getEncryptionProperties();
+  }
+
+  public enum DecryptionConfiguration {
+    DECRYPT_WITH_KEY_RETRIEVER {
+      /**
+       * Decryption configuration 1: Decrypt using key retriever callback that holds the keys
+       * of the encrypted columns and the footer key.
+       */
+      public FileDecryptionProperties getDecryptionProperties() {
+        return FileDecryptionProperties.builder()
+          .withKeyRetriever(decryptionKeyRetrieverMock)
+          .build();
+      }
+    },
+    DECRYPT_WITH_KEY_RETRIEVER_AAD {
+      /**
+       * Decryption configuration 2: Decrypt using key retriever callback that holds the keys
+       * of the encrypted columns and the footer key. Supply aad_prefix.
+       */
+      public FileDecryptionProperties getDecryptionProperties() {
+        return FileDecryptionProperties.builder()
+          .withKeyRetriever(decryptionKeyRetrieverMock)
+          .withAADPrefix(AADPrefix)
+          .build();
+      }
+    },
+    DECRYPT_WITH_EXPLICIT_KEYS {
+      /**
+       * Decryption configuration 3: Decrypt using explicit column and footer keys.
+       */
+      public FileDecryptionProperties getDecryptionProperties() {
+        Map<ColumnPath, ColumnDecryptionProperties> columnMap = getColumnDecryptionPropertiesMap();
+
+        return FileDecryptionProperties.builder()
+          .withColumnKeys(columnMap)
+          .withFooterKey(FOOTER_ENCRYPTION_KEY)
+          .build();
+      }
+    },
+    NO_DECRYPTION {
+      public FileDecryptionProperties getDecryptionProperties() {
+        return null;
+      }
+    };
+
+    abstract public FileDecryptionProperties getDecryptionProperties();
+  }
+
+  @Test
+  public void testWriteReadEncryptedParquetFiles() throws IOException {
+    Path rootPath = new Path(temporaryFolder.getRoot().getPath());
+    LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", rootPath.toString());
+    byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
+    // Write using various encryption configuraions
+    testWriteEncryptedParquetFiles(rootPath, DATA);
+    // Read using various decryption configurations.
+    testReadEncryptedParquetFiles(rootPath, DATA);
+  }
+
+  @Test
+  public void testInteropReadEncryptedParquetFiles() throws IOException {
+    Path rootPath = new Path(PARQUET_TESTING_PATH);
+    LOG.info("======== testInteropReadEncryptedParquetFiles {} ========", rootPath.toString());
+    byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
+    // Read using various decryption configurations.
+    testInteropReadEncryptedParquetFiles(rootPath, true/*readOnlyEncrypted*/, LINEAR_DATA);
+  }
+
+  private static List<SingleRow> generateRandomData(int rowCount) {
+    List<SingleRow> dataList = new ArrayList<>(rowCount);
+    for (int row = 0; row < rowCount; ++row) {
+      SingleRow newRow = new SingleRow(RANDOM.nextBoolean(),
+        intGenerator.nextValue(),  floatGenerator.nextValue(),
+        doubleGenerator.nextValue(), binaryGenerator.nextValue().getBytes(),
+        fixedBinaryGenerator.nextValue().getBytes(), intGenerator.nextValue());
+      dataList.add(newRow);
+    }
+    return dataList;
+  }
+
+  private static List<SingleRow> generateLinearData(int rowCount) {
+    List<SingleRow> dataList = new ArrayList<>(rowCount);
+    String baseStr = "parquet";
+    for (int row = 0; row < rowCount; ++row) {
+      boolean boolean_val = ((row % 2) == 0) ? true : false;
+      float float_val = (float) row * 1.1f;
+      double double_val = (row * 1.1111111);
+
+      byte[] binary_val = null;
+      if ((row % 2) == 0) {
+        char firstChar = (char) ((int) '0' + row / 100);
+        char secondChar = (char) ((int) '0' + (row / 10) % 10);
+        char thirdChar = (char) ((int) '0' + row % 10);
+        binary_val = (baseStr + firstChar + secondChar + thirdChar).getBytes(StandardCharsets.UTF_8);
+      }
+      char[] fixed = new char[FIXED_LENGTH];
+      char[] aChar = Character.toChars(row);
+      Arrays.fill(fixed, aChar[0]);
+
+      SingleRow newRow = new SingleRow(boolean_val,
+        row, float_val, double_val,
+        binary_val, new String(fixed).getBytes(StandardCharsets.UTF_8), null/*plaintext_int32_field*/);
+      dataList.add(newRow);
+    }
+    return dataList;
+  }
+
+  public static class SingleRow {
+    public final boolean boolean_field;
+    public final int int32_field;
+    public final float float_field;
+    public final double double_field;
+    public final byte[] ba_field;
+    public final byte[] flba_field;
+    public final Integer plaintext_int32_field; // Can be null, since it doesn't exist in C++-created files yet.
+
+    public SingleRow(boolean boolean_field,
+                     int int32_field,
+                     float float_field,
+                     double double_field,
+                     byte[] ba_field,
+                     byte[] flba_field,
+                     Integer plaintext_int32_field) {
+      this.boolean_field = boolean_field;
+      this.int32_field = int32_field;
+      this.float_field = float_field;
+      this.double_field = double_field;
+      this.ba_field = ba_field;
+      this.flba_field = flba_field;
+      this.plaintext_int32_field = plaintext_int32_field;
+    }
+  }
+
+  private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data) throws IOException {
+    Configuration conf = new Configuration();
+
+    int pageSize = data.size() / 10;     // Ensure that several pages will be created
+    int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+
+    SimpleGroupFactory f = new SimpleGroupFactory(SCHEMA);
+
+    EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+    for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+      Path file = new Path(root, getFileName(encryptionConfiguration));
+      FileEncryptionProperties encryptionProperties = encryptionConfiguration.getEncryptionProperties();
+      LOG.info("\nWrite " + file.toString());
+      try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .withWriteMode(OVERWRITE)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withType(SCHEMA)
+        .withConf(conf)
+        .withEncryption(encryptionProperties)
+        .build()) {
+
+        for (SingleRow singleRow : data) {
+          writer.write(
+            f.newGroup()
+              .append(BOOLEAN_FIELD_NAME, singleRow.boolean_field)
+              .append(INT32_FIELD_NAME, singleRow.int32_field)
+              .append(FLOAT_FIELD_NAME, singleRow.float_field)
+              .append(DOUBLE_FIELD_NAME, singleRow.double_field)
+              .append(BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.ba_field))
+              .append(FIXED_LENGTH_BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.flba_field))
+              .append(PLAINTEXT_INT32_FIELD_NAME, singleRow.plaintext_int32_field));
+
+        }
+      }
+    }
+  }
+
+  private String getFileName(EncryptionConfiguration encryptionConfiguration) {
+    return encryptionConfiguration.toString().toLowerCase() + ".parquet.encrypted";
+  }
+
+  private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data) {
+    Configuration conf = new Configuration();
+    DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
+    for (DecryptionConfiguration decryptionConfiguration : decryptionConfigurations) {
+      EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+      for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+        Path file = new Path(root, getFileName(encryptionConfiguration));
+        LOG.info("==> Decryption configuration {}", decryptionConfiguration);
+        FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties();
+
+        LOG.info("--> Read file {} {}", file.toString(), encryptionConfiguration);
+
+        // Read only the non-encrypted columns
+        if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
+          (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+          conf.set("parquet.read.schema", Types.buildMessage()
+            .optional(INT32).named(PLAINTEXT_INT32_FIELD_NAME)
+            .named("FormatTestObject").toString());
+        }
+
+        int rowNum = 0;
+        try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+          .withConf(conf)
+          .withDecryption(fileDecryptionProperties)
+          .build()) {
+          for (Group group = reader.read(); group != null; group = reader.read()) {
+            SingleRow rowExpected = data.get(rowNum++);
+            // plaintext columns
+            if (rowExpected.plaintext_int32_field != group.getInteger(PLAINTEXT_INT32_FIELD_NAME, 0)) {
+              addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+            }
+            // encrypted columns
+            if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
+              if (rowExpected.boolean_field != group.getBoolean(BOOLEAN_FIELD_NAME, 0)) {
+                addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
+              }
+              if (rowExpected.int32_field != group.getInteger(INT32_FIELD_NAME, 0)) {
+                addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+              }
+              if (rowExpected.float_field != group.getFloat(FLOAT_FIELD_NAME, 0)) {
+                addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
+              }
+              if (rowExpected.double_field != group.getDouble(DOUBLE_FIELD_NAME, 0)) {
+                addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
+              }
+              if ((null != rowExpected.ba_field) &&
+                !Arrays.equals(rowExpected.ba_field, group.getBinary(BINARY_FIELD_NAME, 0).getBytes())) {
+                addErrorToErrorCollectorAndLog("Wrong byte array", encryptionConfiguration, decryptionConfiguration);
+              }
+              if (!Arrays.equals(rowExpected.flba_field,
+                group.getBinary(FIXED_LENGTH_BINARY_FIELD_NAME, 0).getBytes())) {
+                addErrorToErrorCollectorAndLog("Wrong fixed-length byte array",
+                  encryptionConfiguration, decryptionConfiguration);
+              }
+            }
+          }
+        } catch (ParquetCryptoRuntimeException e) {
+          String errorMessage = e.getMessage();
+          checkResult(file.getName(), decryptionConfiguration, (null == errorMessage ? e.toString() : errorMessage));
+        } catch (Exception e) {
+          addErrorToErrorCollectorAndLog(
+            "Unexpected exception: " + e.getClass().getName() + " with message: " + e.getMessage(),
+            encryptionConfiguration, decryptionConfiguration);
+        }
+        conf.unset("parquet.read.schema");
+      }
+    }
+  }
+
+  private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEncrypted, List<SingleRow> data) throws IOException {
+    Configuration conf = new Configuration();
+    DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
+    for (DecryptionConfiguration decryptionConfiguration : decryptionConfigurations) {
+      EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+      for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+        if (readOnlyEncrypted && (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration)) {
+          continue;
+        }
+        Path file = new Path(root, getFileName(encryptionConfiguration));
+        LOG.info("==> Decryption configuration {}", decryptionConfiguration);
+        FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties();
+
+        LOG.info("--> Read file {} {}", file.toString(), encryptionConfiguration);
+
+        // Read only the non-encrypted columns
+        if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
+          (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+          conf.set("parquet.read.schema", Types.buildMessage()
+            .required(BOOLEAN).named(BOOLEAN_FIELD_NAME)
+            .required(INT32).named(INT32_FIELD_NAME)
+            .named("FormatTestObject").toString());
+        }
+
+        int rowNum = 0;
+        try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+          .withConf(conf)
+          .withDecryption(fileDecryptionProperties)
+          .build()) {
+          for (Group group = reader.read(); group != null; group = reader.read()) {
+            SingleRow rowExpected = data.get(rowNum++);
+            // plaintext columns
+            if (rowExpected.boolean_field != group.getBoolean(BOOLEAN_FIELD_NAME, 0)) {
+              addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
+            }
+            if (rowExpected.int32_field != group.getInteger(INT32_FIELD_NAME, 0)) {
+              addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+            }
+            // encrypted columns
+            if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
+              if (rowExpected.float_field != group.getFloat(FLOAT_FIELD_NAME, 0)) {
+                addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
+              }
+              if (rowExpected.double_field != group.getDouble(DOUBLE_FIELD_NAME, 0)) {
+                addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
+              }
+            }
+          }
+        } catch (ParquetCryptoRuntimeException e) {
+          String errorMessage = e.getMessage();
+          checkResult(file.getName(), decryptionConfiguration, (null == errorMessage ? e.toString() : errorMessage));
+        } catch (Exception e) {
+          addErrorToErrorCollectorAndLog(
+            "Unexpected exception: " + e.getClass().getName() + " with message: " + e.getMessage(),
+            encryptionConfiguration, decryptionConfiguration);
+        }
+        conf.unset("parquet.read.schema");
+      }
+    }
+  }
+
+
+  /**
+   * Check that the decryption result is as expected.
+   */
+  private void checkResult(String file, DecryptionConfiguration decryptionConfiguration, String exceptionMsg) {
+    // Extract encryptionConfigurationNumber from the parquet file name.
+    EncryptionConfiguration encryptionConfiguration = getEncryptionConfigurationFromFilename(file);
+
+    // Encryption_configuration 5 contains aad_prefix and
+    // disable_aad_prefix_storage.
+    // An exception is expected to be thrown if the file is not decrypted with aad_prefix.
+    if (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE) {
+      if (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER ||
+        decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_EXPLICIT_KEYS) {
+        if (!exceptionMsg.contains("AAD")) {
+          addErrorToErrorCollectorAndLog("Expecting AAD related exception", exceptionMsg,
+            encryptionConfiguration, decryptionConfiguration);
+        } else {
+          LOG.info("Exception as expected: " + exceptionMsg);
+        }
+        return;
+      }
+    }
+    // Decryption configuration 2 contains aad_prefix. An exception is expected to
+    // be thrown if the file was not encrypted with the same aad_prefix.
+    if (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER_AAD) {
+      if (encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE &&
+        encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER_AAD &&
+        encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION) {
+        if (!exceptionMsg.contains("AAD")) {
+          addErrorToErrorCollectorAndLog("Expecting AAD related exception", exceptionMsg,
+            encryptionConfiguration, decryptionConfiguration);
+        } else {
+          LOG.info("Exception as expected: " + exceptionMsg);
+        }
+        return;
+      }
+    }
+    // Encryption_configuration 7 has null encryptor, so parquet is plaintext.
+    // An exception is expected to be thrown if the file is being decrypted.
+    if (encryptionConfiguration == EncryptionConfiguration.NO_ENCRYPTION) {
+      if ((decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER) ||
+        (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER_AAD) ||
+        (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_EXPLICIT_KEYS)) {
+        if (!exceptionMsg.endsWith("Applying decryptor on plaintext file")) {
+          addErrorToErrorCollectorAndLog("Expecting exception Applying decryptor on plaintext file",
+            exceptionMsg, encryptionConfiguration, decryptionConfiguration);
+        } else {
+          LOG.info("Exception as expected: " + exceptionMsg);
+        }
+        return;
+      }
+    }
+    // Decryption configuration 4 is null, so only plaintext file can be read. An exception is expected to
+    // be thrown if the file is encrypted.
+    if (decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) {
+      if ((encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION &&
+        encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+        if (!exceptionMsg.endsWith("No keys available") && !exceptionMsg.endsWith("Null File Decryptor") && !exceptionMsg.endsWith("Footer key unavailable")) {
+          addErrorToErrorCollectorAndLog("Expecting No keys available exception", exceptionMsg,
+            encryptionConfiguration, decryptionConfiguration);
+        } else {
+          LOG.info("Exception as expected: " + exceptionMsg);
+        }
+        return;
+      }
+    }
+    if (null != exceptionMsg && !exceptionMsg.isEmpty()) {
+      addErrorToErrorCollectorAndLog("Didn't expect an exception", exceptionMsg,
+        encryptionConfiguration, decryptionConfiguration);
+    }
+  }
+
+  private EncryptionConfiguration getEncryptionConfigurationFromFilename(String file) {
+    if (!file.endsWith(".parquet.encrypted")) {
+      return null;
+    }
+    String fileNamePrefix = file.replaceFirst(".parquet.encrypted", "");
+    try {
+      EncryptionConfiguration encryptionConfiguration = EncryptionConfiguration.valueOf(fileNamePrefix.toUpperCase());
+      return encryptionConfiguration;
+    } catch (IllegalArgumentException e) {
+      LOG.error("File name doesn't match any known encryption configuration: " + file);
+      errorCollector.addError(e);
+      return null;
+    }
+  }
+
+  private void addErrorToErrorCollectorAndLog(String errorMessage, String exceptionMessage, EncryptionConfiguration encryptionConfiguration,
+                                              DecryptionConfiguration decryptionConfiguration) {
+    String fullErrorMessage = String.format("%s - %s Error: %s, but got [%s]",
+      encryptionConfiguration, decryptionConfiguration, errorMessage, exceptionMessage);
+
+    errorCollector.addError(new Throwable(fullErrorMessage));
+    LOG.error(fullErrorMessage);
+  }
+
+  private void addErrorToErrorCollectorAndLog(String errorMessage, EncryptionConfiguration encryptionConfiguration,
+                                                     DecryptionConfiguration decryptionConfiguration) {
+    String fullErrorMessage = String.format("%s - %s Error: %s",
+      encryptionConfiguration, decryptionConfiguration, errorMessage);
+
+    errorCollector.addError(new Throwable(fullErrorMessage));
+    LOG.error(fullErrorMessage);
+  }
+
+  private static Map<ColumnPath, ColumnEncryptionProperties> getColumnEncryptionPropertiesMap() {
+    Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+    ColumnEncryptionProperties columnPropertiesDouble = ColumnEncryptionProperties
+      .builder(DOUBLE_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[0])
+      .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[0])
+      .build();
+    columnPropertiesMap.put(columnPropertiesDouble.getPath(), columnPropertiesDouble);
+
+    ColumnEncryptionProperties columnPropertiesFloat = ColumnEncryptionProperties
+      .builder(FLOAT_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[1])
+      .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[1])
+      .build();
+    columnPropertiesMap.put(columnPropertiesFloat.getPath(), columnPropertiesFloat);
+
+    ColumnEncryptionProperties columnPropertiesBool = ColumnEncryptionProperties
+      .builder(BOOLEAN_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[2])
+      .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[2])
+      .build();
+    columnPropertiesMap.put(columnPropertiesBool.getPath(), columnPropertiesBool);
+
+    ColumnEncryptionProperties columnPropertiesInt32 = ColumnEncryptionProperties
+      .builder(INT32_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[3])
+      .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[3])
+      .build();
+    columnPropertiesMap.put(columnPropertiesInt32.getPath(), columnPropertiesInt32);
+
+    ColumnEncryptionProperties columnPropertiesBinary = ColumnEncryptionProperties
+      .builder(BINARY_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[4])
+      .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[4])
+      .build();
+    columnPropertiesMap.put(columnPropertiesBinary.getPath(), columnPropertiesBinary);
+
+    ColumnEncryptionProperties columnPropertiesFixed = ColumnEncryptionProperties
+      .builder(FIXED_LENGTH_BINARY_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[5])
+      .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[5])
+      .build();
+    columnPropertiesMap.put(columnPropertiesFixed.getPath(), columnPropertiesFixed);
+
+    return columnPropertiesMap;
+  }
+
+  private static Map<ColumnPath, ColumnDecryptionProperties> getColumnDecryptionPropertiesMap() {
+    Map<ColumnPath, ColumnDecryptionProperties> columnMap = new HashMap<>();
+
+    ColumnDecryptionProperties columnDecryptionPropsDouble = ColumnDecryptionProperties
+      .builder(DOUBLE_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[0])
+      .build();
+    columnMap.put(columnDecryptionPropsDouble.getPath(), columnDecryptionPropsDouble);
+
+    ColumnDecryptionProperties columnDecryptionPropsFloat = ColumnDecryptionProperties
+      .builder(FLOAT_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[1])
+      .build();
+    columnMap.put(columnDecryptionPropsFloat.getPath(), columnDecryptionPropsFloat);
+
+    ColumnDecryptionProperties columnDecryptionPropsBool = ColumnDecryptionProperties
+      .builder(BOOLEAN_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[2])
+      .build();
+    columnMap.put(columnDecryptionPropsBool.getPath(), columnDecryptionPropsBool);
+
+    ColumnDecryptionProperties columnDecryptionPropsInt32 = ColumnDecryptionProperties
+      .builder(INT32_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[3])
+      .build();
+    columnMap.put(columnDecryptionPropsInt32.getPath(), columnDecryptionPropsInt32);
+
+    ColumnDecryptionProperties columnDecryptionPropsBinary = ColumnDecryptionProperties
+      .builder(BINARY_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[4])
+      .build();
+    columnMap.put(columnDecryptionPropsBinary.getPath(), columnDecryptionPropsBinary);
+
+    ColumnDecryptionProperties columnDecryptionPropsFixed = ColumnDecryptionProperties
+      .builder(FIXED_LENGTH_BINARY_FIELD_NAME)
+      .withKey(COLUMN_ENCRYPTION_KEYS[5])
+      .build();
+    columnMap.put(columnDecryptionPropsFixed.getPath(), columnDecryptionPropsFixed);
+
+    return columnMap;
+  }
+}
diff --git a/pom.xml b/pom.xml
index 9ef8081..b98e6e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
     <mockito.version>1.10.19</mockito.version>
     <net.openhft.version>0.9</net.openhft.version>
     <maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
+    <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
 
     <!-- parquet-cli dependencies -->
     <opencsv.version>2.3</opencsv.version>
@@ -215,6 +216,30 @@
     <pluginManagement>
       <plugins>
         <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>exec-maven-plugin</artifactId>
+          <version>${exec-maven-plugin.version}</version>
+          <executions>
+            <execution>
+              <id>git submodule update</id>
+              <phase>initialize</phase>
+              <configuration>
+                <executable>git</executable>
+                <arguments>
+                  <argument>submodule</argument>
+                  <argument>update</argument>
+                  <argument>--init</argument>
+                  <argument>--recursive</argument>
+                </arguments>
+              </configuration>
+              <goals>
+                <goal>exec</goal>
+              </goals>
+            </execution>
+          </executions>
+        </plugin>
+
+        <plugin>
           <!-- Disable the source artifact from ASF parent -->
           <artifactId>maven-assembly-plugin</artifactId>
           <executions>
@@ -465,6 +490,7 @@
             <exclude>**/target/**</exclude>
             <exclude>.git/**</exclude>
             <exclude>.gitignore</exclude>
+            <exclude>.gitmodules</exclude>
             <exclude>.idea/**</exclude>
             <exclude>*/jdiff/*.xml</exclude>
             <exclude>.travis.yml</exclude>
diff --git a/submodules/parquet-testing b/submodules/parquet-testing
new file mode 160000
index 0000000..40379b3
--- /dev/null
+++ b/submodules/parquet-testing
@@ -0,0 +1 @@
+Subproject commit 40379b3c58298fd22589dec7e41748375b5a8e82