You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/06/10 11:38:18 UTC
[parquet-mr] branch encryption updated: PARQUET-1807: Encryption:
Interop and Function test suite for Java version (#782)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/encryption by this push:
new cc5e268 PARQUET-1807: Encryption: Interop and Function test suite for Java version (#782)
cc5e268 is described below
commit cc5e2689eeb20d36d5360e26dd68a3a96a557a19
Author: andersonm-ibm <63...@users.noreply.github.com>
AuthorDate: Wed Jun 10 14:38:08 2020 +0300
PARQUET-1807: Encryption: Interop and Function test suite for Java version (#782)
Add a test for writing and reading parquet in a number of encryption
and decryption configurations.
Add interop test that reads files from parquet-testing GitHub
repository, that were written by parquet-cpp.
This adds parquet-testing repo as a submodule.
---
.gitmodules | 3 +
.../parquet/crypto/DecryptionKeyRetrieverMock.java | 41 ++
.../apache/parquet/hadoop/TestBloomEncryption.java | 313 ---------
.../apache/parquet/hadoop/TestBloomFiltering.java | 114 ++-
.../parquet/hadoop/TestColumnIndexEncryption.java | 571 ---------------
.../parquet/hadoop/TestColumnIndexFiltering.java | 122 +++-
.../parquet/hadoop/TestEncryptionOptions.java | 779 +++++++++++++++++++++
pom.xml | 26 +
submodules/parquet-testing | 1 +
9 files changed, 1042 insertions(+), 928 deletions(-)
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..2708799
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "submodules/parquet-testing"]
+ path = submodules/parquet-testing
+ url = https://github.com/apache/parquet-testing.git
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/crypto/DecryptionKeyRetrieverMock.java b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/DecryptionKeyRetrieverMock.java
new file mode 100644
index 0000000..0bb7fc5
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/crypto/DecryptionKeyRetrieverMock.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.crypto;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple key retriever, based on UTF8 strings as key identifiers
+ */
+public class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+ private final Map<String, byte[]> keyMap = new HashMap<>();
+
+ public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+ keyMap.put(keyId, keyBytes);
+ return this;
+ }
+
+ @Override
+ public byte[] getKey(byte[] keyMetaData) {
+ String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+ return keyMap.get(keyId);
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
deleted file mode 100644
index e4cc550..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.crypto.ColumnEncryptionProperties;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.hadoop.TestColumnIndexEncryption.StringKeyIdRetriever;
-import org.apache.parquet.io.api.Binary;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.junit.Assert.*;
-
-@RunWith(Parameterized.class)
-public class TestBloomEncryption {
- private static final Path FILE_V1 = createTempFile();
- private static final Path FILE_V2 = createTempFile();
- private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomEncryption.class);
- private static final Random RANDOM = new Random(42);
- private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
- private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
-
- private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
- private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
- private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
-
- private final Path file;
- public TestBloomEncryption(Path file) {
- this.file = file;
- }
-
- private static Path createTempFile() {
- try {
- return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
- } catch (IOException e) {
- throw new AssertionError("Unable to create temporary file", e);
- }
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> params() {
- return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
- }
-
- private static List<PhoneBookWriter.User> generateData(int rowCount) {
- List<PhoneBookWriter.User> users = new ArrayList<>();
- List<String> names = generateNames(rowCount);
- for (int i = 0; i < rowCount; ++i) {
- users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
- }
- return users;
- }
-
- private static List<String> generateNames(int rowCount) {
- List<String> list = new ArrayList<>();
-
- // Adding fix values for filtering
- list.add("anderson");
- list.add("anderson");
- list.add("miller");
- list.add("miller");
- list.add("miller");
- list.add("thomas");
- list.add("thomas");
- list.add("williams");
-
- int nullCount = rowCount / 100;
-
- String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
- int maxLength = 8;
- for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
- int l = RANDOM.nextInt(maxLength);
- StringBuilder builder = new StringBuilder(l);
- for (int j = 0; j < l; ++j) {
- builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
- }
- list.add(builder.toString());
- }
- list.sort((str1, str2) -> -str1.compareTo(str2));
-
- // Adding nulls to random places
- for (int i = 0; i < nullCount; ++i) {
- list.add(RANDOM.nextInt(list.size()), null);
- }
-
- return list;
- }
-
- private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers() {
- int length = RANDOM.nextInt(5) - 1;
- if (length < 0) {
- return null;
- }
- List<PhoneBookWriter.PhoneNumber> phoneNumbers = new ArrayList<>(length);
- for (int i = 0; i < length; ++i) {
- // 6 digits numbers
- long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
- phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
- }
- return phoneNumbers;
- }
-
- private static PhoneBookWriter.Location generateLocation(int id, int rowCount) {
- if (RANDOM.nextDouble() < 0.01) {
- return null;
- }
-
- if (RANDOM.nextDouble() < 0.001) {
- return new PhoneBookWriter.Location(99.9, 99.9);
- }
-
- double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
- double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
-
- return new PhoneBookWriter.Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
- }
-
- private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
- boolean useBloomFilter) throws IOException {
- /*
- byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
- FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
- .withFooterKey(keyBytes)
- .build();
- */
-
- StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
- kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
- kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
- kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
-
- FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
- .withKeyRetriever(kr1)
- .build();
-
- return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
- .withFilter(FilterCompat.get(filter))
- .withDecryption(fileDecryptionProperties)
- .useDictionaryFilter(useOtherFiltering)
- .useStatsFilter(useOtherFiltering)
- .useRecordFilter(useOtherFiltering)
- .useBloomFilter(useBloomFilter)
- .useColumnIndexFilter(useOtherFiltering));
- }
-
- // Assumes that both lists are in the same order
- private static void assertContains(Stream<PhoneBookWriter.User> expected, List<PhoneBookWriter.User> actual) {
- Iterator<PhoneBookWriter.User> expIt = expected.iterator();
- if (!expIt.hasNext()) {
- return;
- }
- PhoneBookWriter.User exp = expIt.next();
- for (PhoneBookWriter.User act : actual) {
- if (act.equals(exp)) {
- if (!expIt.hasNext()) {
- break;
- }
- exp = expIt.next();
- }
- }
- assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
- }
-
- private void assertCorrectFiltering(Predicate<PhoneBookWriter.User> expectedFilter, FilterPredicate actualFilter)
- throws IOException {
- // Check with only bloom filter based filtering
- List<PhoneBookWriter.User> result = readUsers(actualFilter, false, true);
-
- assertTrue("Bloom filtering should drop some row groups", result.size() < DATA.size());
- LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
- 100 * result.size() / DATA.size());
- // Asserts that all the required records are in the result
- assertContains(DATA.stream().filter(expectedFilter), result);
- // Asserts that all the retrieved records are in the file (validating non-matching records)
- assertContains(result.stream(), DATA);
-
- // Check with all the filtering filtering to ensure the result contains exactly the required values
- result = readUsers(actualFilter, true, false);
- assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
- }
-
-
- @BeforeClass
- public static void createFile() throws IOException {
- int pageSize = DATA.size() / 100; // Ensure that several pages will be created
- int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created
-/*
- byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
- FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(keyBytes)
- .build();
- */
- // Encryption configuration 2: Encrypt two columns and the footer, with different keys.
- ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
- .builder("id")
- .withKey(COLUMN_ENCRYPTION_KEY1)
- .withKeyID("kc1")
- .build();
-
- ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
- .builder("name")
- .withKey(COLUMN_ENCRYPTION_KEY2)
- .withKeyID("kc2")
- .build();
- Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
-
- columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
- columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
-
- FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
- .withFooterKeyID("kf")
- .withEncryptedColumns(columnPropertiesMap)
- .build();
-
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
- .withWriteMode(OVERWRITE)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withBloomFilterNDV("location.lat", 10000L)
- .withBloomFilterNDV("name", 10000L)
- .withBloomFilterNDV("id", 10000L)
- .withEncryption(encryptionProperties)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
- DATA);
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
- .withWriteMode(OVERWRITE)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withBloomFilterNDV("location.lat", 10000L)
- .withBloomFilterNDV("name", 10000L)
- .withBloomFilterNDV("id", 10000L)
- .withEncryption(encryptionProperties)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
- DATA);
- }
-
- @AfterClass
- public static void deleteFile() throws IOException {
- FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
- FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
- }
-
-
- @Test
- public void testSimpleFiltering() throws IOException {
- assertCorrectFiltering(
- record -> record.getId() == 1234L,
- eq(longColumn("id"), 1234L));
-
- assertCorrectFiltering(
- record -> "miller".equals(record.getName()),
- eq(binaryColumn("name"), Binary.fromString("miller")));
- }
-
- @Test
- public void testNestedFiltering() throws IOException {
- assertCorrectFiltering(
- record -> {
- PhoneBookWriter.Location location = record.getLocation();
- return location != null && location.getLat() != null && location.getLat() == 99.9;
- },
- eq(doubleColumn("location.lat"), 99.9));
- }
-}
-
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index 3a6a002..4ebe15a 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -22,11 +22,16 @@ package org.apache.parquet.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.io.api.Binary;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -57,29 +62,46 @@ import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestBloomFiltering {
- private static final Path FILE_V1 = createTempFile();
- private static final Path FILE_V2 = createTempFile();
+ private static final Path FILE_V1 = createTempFile(false);
+ private static final Path FILE_V2 = createTempFile(false);
+ private static final Path FILE_V1_E = createTempFile(true);
+ private static final Path FILE_V2_E = createTempFile(true);
private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomFiltering.class);
private static final Random RANDOM = new Random(42);
private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
+ private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY1 = "1234567890123450".getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY2 = "1234567890123451".getBytes();
+ private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
+ private static final String COLUMN_ENCRYPTION_KEY1_ID = "kc1";
+ private static final String COLUMN_ENCRYPTION_KEY2_ID = "kc2";
+
private final Path file;
- public TestBloomFiltering(Path file) {
+ private final boolean isEncrypted;
+
+ public TestBloomFiltering(Path file, boolean isEncrypted) {
this.file = file;
+ this.isEncrypted = isEncrypted;
}
- private static Path createTempFile() {
+ private static Path createTempFile(boolean encrypted) {
+ String suffix = encrypted ? ".parquet.encrypted" : ".parquet";
try {
- return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
+ return new Path(Files.createTempFile("test-bloom-filter_", suffix).toAbsolutePath().toString());
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
}
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "Run {index}: isEncrypted={1}")
public static Collection<Object[]> params() {
- return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+ return Arrays.asList(
+ new Object[] { FILE_V1, false /*isEncrypted*/ },
+ new Object[] { FILE_V2, false /*isEncrypted*/ },
+ new Object[] { FILE_V1_E, true /*isEncrypted*/ },
+ new Object[] { FILE_V2_E, true /*isEncrypted*/ });
}
private static List<PhoneBookWriter.User> generateData(int rowCount) {
@@ -157,8 +179,21 @@ public class TestBloomFiltering {
private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
boolean useBloomFilter) throws IOException {
+ FileDecryptionProperties fileDecryptionProperties = null;
+ if (isEncrypted) {
+ DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+ .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+ .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
+ .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
+
+ fileDecryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(decryptionKeyRetrieverMock)
+ .build();
+ }
+
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(FilterCompat.get(filter))
+ .withDecryption(fileDecryptionProperties)
.useDictionaryFilter(useOtherFiltering)
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
@@ -202,35 +237,68 @@ public class TestBloomFiltering {
assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
}
+ private static FileEncryptionProperties getFileEncryptionProperties() {
+ ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+ .builder("id")
+ .withKey(COLUMN_ENCRYPTION_KEY1)
+ .withKeyID(COLUMN_ENCRYPTION_KEY1_ID)
+ .build();
+
+ ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+ .builder("name")
+ .withKey(COLUMN_ENCRYPTION_KEY2)
+ .withKeyID(COLUMN_ENCRYPTION_KEY2_ID)
+ .build();
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+ FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyID(FOOTER_ENCRYPTION_KEY_ID)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+
+ return encryptionProperties;
+ }
- @BeforeClass
- public static void createFile() throws IOException {
+ private static void writePhoneBookToFile(Path file,
+ ParquetProperties.WriterVersion parquetVersion,
+ FileEncryptionProperties encryptionProperties) throws IOException {
int pageSize = DATA.size() / 100; // Ensure that several pages will be created
int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+ PhoneBookWriter.write(ExampleParquetWriter.builder(file)
.withWriteMode(OVERWRITE)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withBloomFilterNDV("location.lat", 10000L)
.withBloomFilterNDV("name", 10000L)
.withBloomFilterNDV("id", 10000L)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
- DATA);
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
- .withWriteMode(OVERWRITE)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withBloomFilterNDV("location.lat", 10000L)
- .withBloomFilterNDV("name", 10000L)
- .withBloomFilterNDV("id", 10000L)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
+ .withEncryption(encryptionProperties)
+ .withWriterVersion(parquetVersion),
DATA);
}
+ private static void deleteFile(Path file) throws IOException {
+ file.getFileSystem(new Configuration()).delete(file, false);
+ }
+
+ @BeforeClass
+ public static void createFiles() throws IOException {
+ writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, null);
+ writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, null);
+
+ FileEncryptionProperties encryptionProperties = getFileEncryptionProperties();
+ writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
+ writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
+ }
+
@AfterClass
- public static void deleteFile() throws IOException {
- FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
- FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+ public static void deleteFiles() throws IOException {
+ deleteFile(FILE_V1);
+ deleteFile(FILE_V2);
+ deleteFile(FILE_V1_E);
+ deleteFile(FILE_V2_E);
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
deleted file mode 100644
index 2fd2207..0000000
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop;
-
-import static java.util.Collections.emptyList;
-import static java.util.stream.Collectors.toList;
-import static org.apache.parquet.filter2.predicate.FilterApi.and;
-import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.eq;
-import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
-import static org.apache.parquet.filter2.predicate.FilterApi.lt;
-import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.not;
-import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
-import static org.apache.parquet.filter2.predicate.FilterApi.or;
-import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
-import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
-import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
-import static org.apache.parquet.schema.Types.optional;
-import static org.apache.parquet.schema.Types.required;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.crypto.ColumnEncryptionProperties;
-import org.apache.parquet.crypto.DecryptionKeyRetriever;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.compat.FilterCompat.Filter;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.predicate.Statistics;
-import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Types;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit tests for high level column index based filtering.
- */
-@RunWith(Parameterized.class)
-public class TestColumnIndexEncryption {
- private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexEncryption.class);
- private static final Random RANDOM = new Random(42);
- private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
- private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
- private static final Path FILE_V1 = createTempFile();
- private static final Path FILE_V2 = createTempFile();
- private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
- .required(INT64).named("id")
- .optionalGroup()
- .addField(optional(DOUBLE).named("lon"))
- .addField(optional(DOUBLE).named("lat"))
- .named("location")
- .optionalGroup()
- .repeatedGroup()
- .addField(required(INT64).named("number"))
- .addField(optional(BINARY).as(stringType()).named("kind"))
- .named("phone")
- .named("phoneNumbers")
- .named("user_without_name");
- private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
- private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
- private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
-
-//Simple key retriever, based on UTF8 strings as key identifiers
- static class StringKeyIdRetriever implements DecryptionKeyRetriever{
-
- private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
-
- public void putKey(String keyId, byte[] keyBytes) {
- keyMap.put(keyId, keyBytes);
- }
-
- @Override
- public byte[] getKey(byte[] keyMetaData) {
- String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
- return keyMap.get(keyId);
- }
- }
-
-
- @Parameters
- public static Collection<Object[]> params() {
- return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
- }
-
- private final Path file;
-
- public TestColumnIndexEncryption(Path file) {
- this.file = file;
- }
-
- private static List<User> generateData(int rowCount) {
- List<User> users = new ArrayList<>();
- List<String> names = generateNames(rowCount);
- for (int i = 0; i < rowCount; ++i) {
- users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
- }
- return users;
- }
-
- private static List<String> generateNames(int rowCount) {
- List<String> list = new ArrayList<>();
-
- // Adding fix values for filtering
- list.add("anderson");
- list.add("anderson");
- list.add("miller");
- list.add("miller");
- list.add("miller");
- list.add("thomas");
- list.add("thomas");
- list.add("williams");
-
- int nullCount = rowCount / 100;
-
- String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
- int maxLength = 8;
- for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
- int l = RANDOM.nextInt(maxLength);
- StringBuilder builder = new StringBuilder(l);
- for (int j = 0; j < l; ++j) {
- builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
- }
- list.add(builder.toString());
- }
- Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
-
- // Adding nulls to random places
- for (int i = 0; i < nullCount; ++i) {
- list.add(RANDOM.nextInt(list.size()), null);
- }
-
- return list;
- }
-
- private static List<PhoneNumber> generatePhoneNumbers() {
- int length = RANDOM.nextInt(5) - 1;
- if (length < 0) {
- return null;
- }
- List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
- for (int i = 0; i < length; ++i) {
- // 6 digits numbers
- long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
- phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
- }
- return phoneNumbers;
- }
-
- private static Location generateLocation(int id, int rowCount) {
- if (RANDOM.nextDouble() < 0.01) {
- return null;
- }
-
- double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
- double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
-
- return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
- }
-
- private static Path createTempFile() {
- try {
- return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
- } catch (IOException e) {
- throw new AssertionError("Unable to create temporary file", e);
- }
- }
-
- private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
- return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
- }
-
- private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
- throws IOException {
- return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
- }
-
- private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
- return readUsers(filter, useOtherFiltering, true);
- }
-
- private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
- throws IOException {
-
- StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
- kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
- kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
- kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
-
- FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
- .withKeyRetriever(kr1)
- .build();
-
- return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
- .withFilter(filter)
- .withDecryption(fileDecryptionProperties)
- .useDictionaryFilter(useOtherFiltering)
- .useStatsFilter(useOtherFiltering)
- .useRecordFilter(useOtherFiltering)
- .useColumnIndexFilter(useColumnIndexFilter));
- }
-
- private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {
- StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
- kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
- kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
- kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
-
- FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
- .withKeyRetriever(kr1)
- .build();
-
- return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
- .withFilter(filter)
- .withDecryption(fileDecryptionProperties)
- .useDictionaryFilter(useOtherFiltering)
- .useStatsFilter(useOtherFiltering)
- .useRecordFilter(useOtherFiltering)
- .useColumnIndexFilter(useColumnIndexFilter)
- .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
- }
-
- // Assumes that both lists are in the same order
- private static void assertContains(Stream<User> expected, List<User> actual) {
- Iterator<User> expIt = expected.iterator();
- if (!expIt.hasNext()) {
- return;
- }
- User exp = expIt.next();
- for (User act : actual) {
- if (act.equals(exp)) {
- if (!expIt.hasNext()) {
- break;
- }
- exp = expIt.next();
- }
- }
- assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
- }
-
- private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
- throws IOException {
- // Check with only column index based filtering
- List<User> result = readUsers(actualFilter, false);
-
- assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());
- LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
- 100 * result.size() / DATA.size());
- // Asserts that all the required records are in the result
- assertContains(DATA.stream().filter(expectedFilter), result);
- // Asserts that all the retrieved records are in the file (validating non-matching records)
- assertContains(result.stream(), DATA);
-
- // Check with all the filtering filtering to ensure the result contains exactly the required values
- result = readUsers(actualFilter, true);
- assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
- }
-
- @BeforeClass
- public static void createFile() throws IOException {
- int pageSize = DATA.size() / 10; // Ensure that several pages will be created
- int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
-
- // Encryption configuration: Encrypt two columns and the footer, with different keys.
- ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
- .builder("id")
- .withKey(COLUMN_ENCRYPTION_KEY1)
- .withKeyID("kc1")
- .build();
-
- ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
- .builder("name")
- .withKey(COLUMN_ENCRYPTION_KEY2)
- .withKeyID("kc2")
- .build();
- Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
-
- columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
- columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
-
- FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
- .withFooterKeyID("kf")
- .withEncryptedColumns(columnPropertiesMap)
- .build();
-
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
- .withWriteMode(OVERWRITE)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withEncryption(encryptionProperties)
- .withWriterVersion(WriterVersion.PARQUET_1_0),
- DATA);
-
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
- .withWriteMode(OVERWRITE)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withEncryption(encryptionProperties)
- .withWriterVersion(WriterVersion.PARQUET_2_0),
- DATA);
- }
-
- @AfterClass
- public static void deleteFile() throws IOException {
- FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
- FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
- }
-
- @Test
- public void testSimpleFiltering() throws IOException {
- assertCorrectFiltering(
- record -> record.getId() == 1234,
- eq(longColumn("id"), 1234l));
- assertCorrectFiltering(
- record -> "miller".equals(record.getName()),
- eq(binaryColumn("name"), Binary.fromString("miller")));
- assertCorrectFiltering(
- record -> record.getName() == null,
- eq(binaryColumn("name"), null));
- }
-
- @Test
- public void testNoFiltering() throws IOException {
- // Column index filtering with no-op filter
- assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
- assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
-
- // Column index filtering turned off
- assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
- readUsers(eq(longColumn("id"), 1234l), true, false));
- assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()),
- readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
- assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()),
- readUsers(eq(binaryColumn("name"), null), true, false));
-
- // Every filtering mechanism turned off
- assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
- assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
- assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
- }
-
- @Test
- public void testComplexFiltering() throws IOException {
- assertCorrectFiltering(
- record -> {
- Location loc = record.getLocation();
- Double lat = loc == null ? null : loc.getLat();
- Double lon = loc == null ? null : loc.getLon();
- return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
- },
- and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)),
- and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
- assertCorrectFiltering(
- record -> {
- Location loc = record.getLocation();
- return loc == null || (loc.getLat() == null && loc.getLon() == null);
- },
- and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null)));
- assertCorrectFiltering(
- record -> {
- String name = record.getName();
- return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
- },
- and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
- }
-
- public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
- private static final Binary A = Binary.fromString("a");
- private static final Binary B = Binary.fromString("b");
- private static final Binary E = Binary.fromString("e");
- private static final Binary F = Binary.fromString("f");
- private static final Binary I = Binary.fromString("i");
- private static final Binary J = Binary.fromString("j");
- private static final Binary O = Binary.fromString("o");
- private static final Binary P = Binary.fromString("p");
- private static final Binary U = Binary.fromString("u");
- private static final Binary V = Binary.fromString("v");
-
- private static boolean isStartingWithVowel(String str) {
- if (str == null || str.isEmpty()) {
- return false;
- }
- switch (str.charAt(0)) {
- case 'a':
- case 'e':
- case 'i':
- case 'o':
- case 'u':
- return true;
- default:
- return false;
- }
- }
-
- @Override
- public boolean keep(Binary value) {
- return value != null && isStartingWithVowel(value.toStringUsingUTF8());
- }
-
- @Override
- public boolean canDrop(Statistics<Binary> statistics) {
- Comparator<Binary> cmp = statistics.getComparator();
- Binary min = statistics.getMin();
- Binary max = statistics.getMax();
- return cmp.compare(max, A) < 0
- || (cmp.compare(min, B) >= 0 && cmp.compare(max, E) < 0)
- || (cmp.compare(min, F) >= 0 && cmp.compare(max, I) < 0)
- || (cmp.compare(min, J) >= 0 && cmp.compare(max, O) < 0)
- || (cmp.compare(min, P) >= 0 && cmp.compare(max, U) < 0)
- || cmp.compare(min, V) >= 0;
- }
-
- @Override
- public boolean inverseCanDrop(Statistics<Binary> statistics) {
- Comparator<Binary> cmp = statistics.getComparator();
- Binary min = statistics.getMin();
- Binary max = statistics.getMax();
- return (cmp.compare(min, A) >= 0 && cmp.compare(max, B) < 0)
- || (cmp.compare(min, E) >= 0 && cmp.compare(max, F) < 0)
- || (cmp.compare(min, I) >= 0 && cmp.compare(max, J) < 0)
- || (cmp.compare(min, O) >= 0 && cmp.compare(max, P) < 0)
- || (cmp.compare(min, U) >= 0 && cmp.compare(max, V) < 0);
- }
- }
-
- public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
- private long divisor;
-
- IsDivisibleBy(long divisor) {
- this.divisor = divisor;
- }
-
- @Override
- public boolean keep(Long value) {
- // Deliberately not checking for null to verify the handling of NPE
- // Implementors shall always checks the value for null and return accordingly
- return value % divisor == 0;
- }
-
- @Override
- public boolean canDrop(Statistics<Long> statistics) {
- long min = statistics.getMin();
- long max = statistics.getMax();
- return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
- }
-
- @Override
- public boolean inverseCanDrop(Statistics<Long> statistics) {
- long min = statistics.getMin();
- long max = statistics.getMax();
- return min == max && min % divisor == 0;
- }
- }
-
- @Test
- public void testUDF() throws IOException {
- assertCorrectFiltering(
- record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
- or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
- userDefined(longColumn("id"), new IsDivisibleBy(234))));
- assertCorrectFiltering(
- record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
- not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
- userDefined(longColumn("id"), new IsDivisibleBy(234)))));
- }
-
- @Test
- public void testFilteringWithMissingColumns() throws IOException {
- // Missing column filter is always true
- assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
- assertCorrectFiltering(
- record -> record.getId() == 1234,
- and(eq(longColumn("id"), 1234l),
- eq(longColumn("not-existing-long"), null)));
- assertCorrectFiltering(
- record -> "miller".equals(record.getName()),
- and(eq(binaryColumn("name"), Binary.fromString("miller")),
- invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
-
- // Missing column filter is always false
- assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
- assertCorrectFiltering(
- record -> "miller".equals(record.getName()),
- or(eq(binaryColumn("name"), Binary.fromString("miller")),
- gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
- assertCorrectFiltering(
- record -> record.getId() == 1234,
- or(eq(longColumn("id"), 1234l),
- userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
- }
-
- @Test
- public void testFilteringWithProjection() throws IOException {
- // All rows shall be retrieved because all values in column 'name' shall be handled as null values
- assertEquals(
- DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()),
- readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
-
- // Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
- assertEquals(
- emptyList(),
- readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
- assertEquals(
- emptyList(),
- readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
- SCHEMA_WITHOUT_NAME, false, true));
- }
-}
-
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index c18212e..d2a5395 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -52,8 +52,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -61,7 +63,12 @@ import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
@@ -74,6 +81,7 @@ import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
@@ -95,8 +103,10 @@ public class TestColumnIndexFiltering {
private static final Random RANDOM = new Random(42);
private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
- private static final Path FILE_V1 = createTempFile();
- private static final Path FILE_V2 = createTempFile();
+ private static final Path FILE_V1 = createTempFile(false);
+ private static final Path FILE_V2 = createTempFile(false);
+ private static final Path FILE_V1_E = createTempFile(true);
+ private static final Path FILE_V2_E = createTempFile(true);
private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
.required(INT64).named("id")
.optionalGroup()
@@ -111,15 +121,28 @@ public class TestColumnIndexFiltering {
.named("phoneNumbers")
.named("user_without_name");
- @Parameters
+ private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY1 = "1234567890123450".getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY2 = "1234567890123451".getBytes();
+ private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
+ private static final String COLUMN_ENCRYPTION_KEY1_ID = "kc1";
+ private static final String COLUMN_ENCRYPTION_KEY2_ID = "kc2";
+
+ @Parameters(name = "Run {index}: isEncrypted={1}")
public static Collection<Object[]> params() {
- return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+ return Arrays.asList(
+ new Object[] { FILE_V1, false /*isEncrypted*/ },
+ new Object[] { FILE_V2, false /*isEncrypted*/ },
+ new Object[] { FILE_V1_E, true /*isEncrypted*/ },
+ new Object[] { FILE_V2_E, true /*isEncrypted*/ });
}
private final Path file;
+ private final boolean isEncrypted;
- public TestColumnIndexFiltering(Path file) {
+ public TestColumnIndexFiltering(Path file, boolean isEncrypted) {
this.file = file;
+ this.isEncrypted = isEncrypted;
}
private static List<User> generateData(int rowCount) {
@@ -191,9 +214,10 @@ public class TestColumnIndexFiltering {
return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
}
- private static Path createTempFile() {
+ private static Path createTempFile(boolean encrypted) {
+ String suffix = encrypted ? ".parquet.encrypted" : ".parquet";
try {
- return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+ return new Path(Files.createTempFile("test-ci_", suffix).toAbsolutePath().toString());
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
}
@@ -214,17 +238,22 @@ public class TestColumnIndexFiltering {
private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
throws IOException {
+ FileDecryptionProperties decryptionProperties = getFileDecryptionProperties();
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(filter)
+ .withDecryption(decryptionProperties)
.useDictionaryFilter(useOtherFiltering)
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useColumnIndexFilter(useColumnIndexFilter));
}
- private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {
+ private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering,
+ boolean useColumnIndexFilter) throws IOException {
+ FileDecryptionProperties decryptionProperties = getFileDecryptionProperties();
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(filter)
+ .withDecryption(decryptionProperties)
.useDictionaryFilter(useOtherFiltering)
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
@@ -232,6 +261,21 @@ public class TestColumnIndexFiltering {
.set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
}
+ private FileDecryptionProperties getFileDecryptionProperties() {
+ FileDecryptionProperties decryptionProperties = null;
+ if (isEncrypted) {
+ DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+ .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+ .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
+ .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
+
+ decryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(decryptionKeyRetrieverMock)
+ .build();
+ }
+ return decryptionProperties;
+ }
+
// Assumes that both lists are in the same order
private static void assertContains(Stream<User> expected, List<User> actual) {
Iterator<User> expIt = expected.iterator();
@@ -269,27 +313,63 @@ public class TestColumnIndexFiltering {
}
@BeforeClass
- public static void createFile() throws IOException {
+ public static void createFiles() throws IOException {
+ writePhoneBookToFile(FILE_V1, WriterVersion.PARQUET_1_0, null);
+ writePhoneBookToFile(FILE_V2, WriterVersion.PARQUET_2_0, null);
+ FileEncryptionProperties encryptionProperties = getFileEncryptionProperties();
+ writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
+ writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
+ }
+
+ private static void writePhoneBookToFile(Path file, WriterVersion parquetVersion,
+ FileEncryptionProperties encryptionProperties) throws IOException {
int pageSize = DATA.size() / 10; // Ensure that several pages will be created
int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
- .withWriteMode(OVERWRITE)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withWriterVersion(WriterVersion.PARQUET_1_0),
- DATA);
- PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+
+ PhoneBookWriter.write(ExampleParquetWriter.builder(file)
.withWriteMode(OVERWRITE)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
- .withWriterVersion(WriterVersion.PARQUET_2_0),
- DATA);
+ .withEncryption(encryptionProperties)
+ .withWriterVersion(parquetVersion),
+ DATA);
+ }
+
+ private static FileEncryptionProperties getFileEncryptionProperties() {
+ ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+ .builder("id")
+ .withKey(COLUMN_ENCRYPTION_KEY1)
+ .withKeyID(COLUMN_ENCRYPTION_KEY1_ID)
+ .build();
+
+ ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+ .builder("name")
+ .withKey(COLUMN_ENCRYPTION_KEY2)
+ .withKeyID(COLUMN_ENCRYPTION_KEY2_ID)
+ .build();
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+ FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyID(FOOTER_ENCRYPTION_KEY_ID)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+
+ return encryptionProperties;
+ }
+
+ private static void deleteFile(Path file) throws IOException {
+ file.getFileSystem(new Configuration()).delete(file, false);
}
@AfterClass
- public static void deleteFile() throws IOException {
- FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
- FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+ public static void deleteFiles() throws IOException {
+ deleteFile(FILE_V1);
+ deleteFile(FILE_V2);
+ deleteFile(FILE_V1_E);
+ deleteFile(FILE_V2_E);
}
@Test
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
new file mode 100644
index 0000000..7f0111d
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestEncryptionOptions.java
@@ -0,0 +1,779 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnDecryptionProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.apache.parquet.statistics.RandomValues;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+/*
+ * This file contains samples for writing and reading encrypted Parquet files in different
+ * encryption and decryption configurations. The samples have the following goals:
+ * 1) Demonstrate usage of different options for data encryption and decryption.
+ * 2) Produce encrypted files for interoperability tests with other (eg parquet-cpp)
+ * readers that support encryption.
+ * 3) Produce encrypted files with plaintext footer, for testing the ability of legacy
+ * readers to parse the footer and read unencrypted columns.
+ * 4) Perform interoperability tests with other (eg parquet-cpp) writers, by reading
+ * encrypted files produced by these writers.
+ *
+ * The write sample produces number of parquet files, each encrypted with a different
+ * encryption configuration as described below.
+ * The name of each file is in the form of:
+ * tester<encryption config number>.parquet.encrypted.
+ *
+ * The read sample creates a set of decryption configurations and then uses each of them
+ * to read all encrypted files in the input directory.
+ *
+ * The different encryption and decryption configurations are listed below.
+ *
+ *
+ * A detailed description of the Parquet Modular Encryption specification can be found
+ * here:
+ * https://github.com/apache/parquet-format/blob/encryption/Encryption.md
+ *
+ * The write sample creates files with seven columns in the following
+ * encryption configurations:
+ *
+ * UNIFORM_ENCRYPTION: Encrypt all columns and the footer with the same key.
+ * (uniform encryption)
+ * ENCRYPT_COLUMNS_AND_FOOTER: Encrypt six columns and the footer, with different
+ * keys.
+ * ENCRYPT_COLUMNS_PLAINTEXT_FOOTER: Encrypt six columns, with different keys.
+ * Do not encrypt footer (to enable legacy readers)
+ * - plaintext footer mode.
+ * ENCRYPT_COLUMNS_AND_FOOTER_AAD: Encrypt six columns and the footer, with different
+ * keys. Supply aad_prefix for file identity
+ * verification.
+ * ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE: Encrypt six columns and the footer,
+ * with different keys. Supply aad_prefix, and call
+ * disable_aad_prefix_storage to prevent file
+ * identity storage in file metadata.
+ * ENCRYPT_COLUMNS_AND_FOOTER_CTR: Encrypt six columns and the footer, with different
+ * keys. Use the alternative (AES_GCM_CTR_V1) algorithm.
+ * NO_ENCRYPTION: Do not encrypt anything
+ *
+ *
+ * The read sample uses each of the following decryption configurations to read every
+ * encrypted files in the input directory:
+ *
+ * DECRYPT_WITH_KEY_RETRIEVER: Decrypt using key retriever that holds the keys of
+ * the encrypted columns and the footer key.
+ * DECRYPT_WITH_KEY_RETRIEVER_AAD: Decrypt using key retriever that holds the keys of
+ * the encrypted columns and the footer key. Supplies
+ * aad_prefix to verify file identity.
+ * DECRYPT_WITH_EXPLICIT_KEYS: Decrypt using explicit column and footer keys
+ * (instead of key retrieval callback).
+ * NO_DECRYPTION: Do not decrypt anything.
+ */
+public class TestEncryptionOptions {
+ private static final Logger LOG = LoggerFactory.getLogger(TestEncryptionOptions.class);
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule
+ public ErrorCollector errorCollector = new ErrorCollector();
+
+ private static String PARQUET_TESTING_PATH = "../submodules/parquet-testing/data";
+ private static final int RANDOM_SEED = 42;
+ private static final int FIXED_LENGTH = 10;
+ private static final Random RANDOM = new Random(RANDOM_SEED);
+ private static final RandomValues.IntGenerator intGenerator
+ = new RandomValues.IntGenerator(RANDOM_SEED);
+ private static final RandomValues.FloatGenerator floatGenerator
+ = new RandomValues.FloatGenerator(RANDOM_SEED);
+ private static final RandomValues.DoubleGenerator doubleGenerator
+ = new RandomValues.DoubleGenerator(RANDOM_SEED);
+ private static final RandomValues.BinaryGenerator binaryGenerator
+ = new RandomValues.BinaryGenerator(RANDOM_SEED);
+ private static final RandomValues.FixedGenerator fixedBinaryGenerator
+ = new RandomValues.FixedGenerator(RANDOM_SEED, FIXED_LENGTH);
+
+ private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
+ private static final byte[][] COLUMN_ENCRYPTION_KEYS = { "1234567890123450".getBytes(),
+ "1234567890123451".getBytes(), "1234567890123452".getBytes(), "1234567890123453".getBytes(),
+ "1234567890123454".getBytes(), "1234567890123455".getBytes()};
+ private static final String[] COLUMN_ENCRYPTION_KEY_IDS = { "kc1", "kc2", "kc3", "kc4", "kc5", "kc6"};
+ private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
+ private static final String AAD_PREFIX_STRING = "tester";
+ private static final String BOOLEAN_FIELD_NAME = "boolean_field";
+ private static final String INT32_FIELD_NAME = "int32_field";
+ private static final String FLOAT_FIELD_NAME = "float_field";
+ private static final String DOUBLE_FIELD_NAME = "double_field";
+ private static final String BINARY_FIELD_NAME = "ba_field";
+ private static final String FIXED_LENGTH_BINARY_FIELD_NAME = "flba_field";
+ private static final String PLAINTEXT_INT32_FIELD_NAME = "plain_int32_field";
+
+ private static final byte[] footerKeyMetadata = FOOTER_ENCRYPTION_KEY_ID.getBytes(StandardCharsets.UTF_8);
+ private static final byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
+
+ private static final int ROW_COUNT = 10000;
+ private static final List<SingleRow> DATA = Collections.unmodifiableList(generateRandomData(ROW_COUNT));
+ private static final List<SingleRow> LINEAR_DATA = Collections.unmodifiableList(generateLinearData(250));
+
+ private static final MessageType SCHEMA =
+ new MessageType("schema",
+ new PrimitiveType(REQUIRED, BOOLEAN, BOOLEAN_FIELD_NAME),
+ new PrimitiveType(REQUIRED, INT32, INT32_FIELD_NAME),
+ new PrimitiveType(REQUIRED, FLOAT, FLOAT_FIELD_NAME),
+ new PrimitiveType(REQUIRED, DOUBLE, DOUBLE_FIELD_NAME),
+ new PrimitiveType(OPTIONAL, BINARY, BINARY_FIELD_NAME),
+ Types.required(FIXED_LEN_BYTE_ARRAY).length(FIXED_LENGTH).named(FIXED_LENGTH_BINARY_FIELD_NAME),
+ new PrimitiveType(OPTIONAL, INT32, PLAINTEXT_INT32_FIELD_NAME));
+
+ private static final DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+ .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+ .putKey(COLUMN_ENCRYPTION_KEY_IDS[0], COLUMN_ENCRYPTION_KEYS[0])
+ .putKey(COLUMN_ENCRYPTION_KEY_IDS[1], COLUMN_ENCRYPTION_KEYS[1])
+ .putKey(COLUMN_ENCRYPTION_KEY_IDS[2], COLUMN_ENCRYPTION_KEYS[2])
+ .putKey(COLUMN_ENCRYPTION_KEY_IDS[3], COLUMN_ENCRYPTION_KEYS[3])
+ .putKey(COLUMN_ENCRYPTION_KEY_IDS[4], COLUMN_ENCRYPTION_KEYS[4])
+ .putKey(COLUMN_ENCRYPTION_KEY_IDS[5], COLUMN_ENCRYPTION_KEYS[5]);
+
+ public enum EncryptionConfiguration {
+ UNIFORM_ENCRYPTION {
+ /**
+ * Encryption configuration 1: Encrypt all columns and the footer with the same key.
+ */
+ public FileEncryptionProperties getEncryptionProperties() {
+ return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyMetadata(footerKeyMetadata).build();
+ }
+ },
+ ENCRYPT_COLUMNS_AND_FOOTER {
+ /**
+ * Encryption configuration 2: Encrypt six columns and the footer, with different keys.
+ */
+ public FileEncryptionProperties getEncryptionProperties() {
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+ return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyMetadata(footerKeyMetadata)
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+ }
+ },
+ ENCRYPT_COLUMNS_PLAINTEXT_FOOTER {
+ /**
+ * Encryption configuration 3: Encrypt six columns, with different keys.
+ * Don't encrypt footer.
+ * (plaintext footer mode, readable by legacy readers)
+ */
+ public FileEncryptionProperties getEncryptionProperties() {
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+ return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyMetadata(footerKeyMetadata)
+ .withEncryptedColumns(columnPropertiesMap)
+ .withPlaintextFooter()
+ .build();
+ }
+ },
+ ENCRYPT_COLUMNS_AND_FOOTER_AAD {
+ /**
+ * Encryption configuration 4: Encrypt six columns and the footer, with different keys.
+ * Use aad_prefix.
+ */
+ public FileEncryptionProperties getEncryptionProperties() {
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+ return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyMetadata(footerKeyMetadata)
+ .withEncryptedColumns(columnPropertiesMap)
+ .withAADPrefix(AADPrefix)
+ .build();
+ }
+ },
+ ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE {
+ /**
+ * Encryption configuration 5: Encrypt six columns and the footer, with different keys.
+ * Use aad_prefix and disable_aad_prefix_storage.
+ */
+ public FileEncryptionProperties getEncryptionProperties() {
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+ return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyMetadata(footerKeyMetadata)
+ .withEncryptedColumns(columnPropertiesMap)
+ .withAADPrefix(AADPrefix)
+ .withoutAADPrefixStorage()
+ .build();
+ }
+ },
+ ENCRYPT_COLUMNS_AND_FOOTER_CTR {
+ /**
+ * Encryption configuration 6: Encrypt six columns and the footer, with different keys.
+ * Use AES_GCM_CTR_V1 algorithm.
+ */
+ public FileEncryptionProperties getEncryptionProperties() {
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = getColumnEncryptionPropertiesMap();
+ return FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyMetadata(footerKeyMetadata)
+ .withEncryptedColumns(columnPropertiesMap)
+ .withAlgorithm(ParquetCipher.AES_GCM_CTR_V1)
+ .build();
+ }
+ },
+ NO_ENCRYPTION {
+ public FileEncryptionProperties getEncryptionProperties() {
+ return null;
+ }
+ };
+
+ abstract public FileEncryptionProperties getEncryptionProperties();
+ }
+
+ public enum DecryptionConfiguration {
+ DECRYPT_WITH_KEY_RETRIEVER {
+ /**
+ * Decryption configuration 1: Decrypt using key retriever callback that holds the keys
+ * of the encrypted columns and the footer key.
+ */
+ public FileDecryptionProperties getDecryptionProperties() {
+ return FileDecryptionProperties.builder()
+ .withKeyRetriever(decryptionKeyRetrieverMock)
+ .build();
+ }
+ },
+ DECRYPT_WITH_KEY_RETRIEVER_AAD {
+ /**
+ * Decryption configuration 2: Decrypt using key retriever callback that holds the keys
+ * of the encrypted columns and the footer key. Supply aad_prefix.
+ */
+ public FileDecryptionProperties getDecryptionProperties() {
+ return FileDecryptionProperties.builder()
+ .withKeyRetriever(decryptionKeyRetrieverMock)
+ .withAADPrefix(AADPrefix)
+ .build();
+ }
+ },
+ DECRYPT_WITH_EXPLICIT_KEYS {
+ /**
+ * Decryption configuration 3: Decrypt using explicit column and footer keys.
+ */
+ public FileDecryptionProperties getDecryptionProperties() {
+ Map<ColumnPath, ColumnDecryptionProperties> columnMap = getColumnDecryptionPropertiesMap();
+
+ return FileDecryptionProperties.builder()
+ .withColumnKeys(columnMap)
+ .withFooterKey(FOOTER_ENCRYPTION_KEY)
+ .build();
+ }
+ },
+ NO_DECRYPTION {
+ public FileDecryptionProperties getDecryptionProperties() {
+ return null;
+ }
+ };
+
+ abstract public FileDecryptionProperties getDecryptionProperties();
+ }
+
+ @Test
+ public void testWriteReadEncryptedParquetFiles() throws IOException {
+ Path rootPath = new Path(temporaryFolder.getRoot().getPath());
+ LOG.info("======== testWriteReadEncryptedParquetFiles {} ========", rootPath.toString());
+ byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
+ // Write using various encryption configuraions
+ testWriteEncryptedParquetFiles(rootPath, DATA);
+ // Read using various decryption configurations.
+ testReadEncryptedParquetFiles(rootPath, DATA);
+ }
+
+ @Test
+ public void testInteropReadEncryptedParquetFiles() throws IOException {
+ Path rootPath = new Path(PARQUET_TESTING_PATH);
+ LOG.info("======== testInteropReadEncryptedParquetFiles {} ========", rootPath.toString());
+ byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
+ // Read using various decryption configurations.
+ testInteropReadEncryptedParquetFiles(rootPath, true/*readOnlyEncrypted*/, LINEAR_DATA);
+ }
+
+ private static List<SingleRow> generateRandomData(int rowCount) {
+ List<SingleRow> dataList = new ArrayList<>(rowCount);
+ for (int row = 0; row < rowCount; ++row) {
+ SingleRow newRow = new SingleRow(RANDOM.nextBoolean(),
+ intGenerator.nextValue(), floatGenerator.nextValue(),
+ doubleGenerator.nextValue(), binaryGenerator.nextValue().getBytes(),
+ fixedBinaryGenerator.nextValue().getBytes(), intGenerator.nextValue());
+ dataList.add(newRow);
+ }
+ return dataList;
+ }
+
+ private static List<SingleRow> generateLinearData(int rowCount) {
+ List<SingleRow> dataList = new ArrayList<>(rowCount);
+ String baseStr = "parquet";
+ for (int row = 0; row < rowCount; ++row) {
+ boolean boolean_val = ((row % 2) == 0) ? true : false;
+ float float_val = (float) row * 1.1f;
+ double double_val = (row * 1.1111111);
+
+ byte[] binary_val = null;
+ if ((row % 2) == 0) {
+ char firstChar = (char) ((int) '0' + row / 100);
+ char secondChar = (char) ((int) '0' + (row / 10) % 10);
+ char thirdChar = (char) ((int) '0' + row % 10);
+ binary_val = (baseStr + firstChar + secondChar + thirdChar).getBytes(StandardCharsets.UTF_8);
+ }
+ char[] fixed = new char[FIXED_LENGTH];
+ char[] aChar = Character.toChars(row);
+ Arrays.fill(fixed, aChar[0]);
+
+ SingleRow newRow = new SingleRow(boolean_val,
+ row, float_val, double_val,
+ binary_val, new String(fixed).getBytes(StandardCharsets.UTF_8), null/*plaintext_int32_field*/);
+ dataList.add(newRow);
+ }
+ return dataList;
+ }
+
+ public static class SingleRow {
+ public final boolean boolean_field;
+ public final int int32_field;
+ public final float float_field;
+ public final double double_field;
+ public final byte[] ba_field;
+ public final byte[] flba_field;
+ public final Integer plaintext_int32_field; // Can be null, since it doesn't exist in C++-created files yet.
+
+ public SingleRow(boolean boolean_field,
+ int int32_field,
+ float float_field,
+ double double_field,
+ byte[] ba_field,
+ byte[] flba_field,
+ Integer plaintext_int32_field) {
+ this.boolean_field = boolean_field;
+ this.int32_field = int32_field;
+ this.float_field = float_field;
+ this.double_field = double_field;
+ this.ba_field = ba_field;
+ this.flba_field = flba_field;
+ this.plaintext_int32_field = plaintext_int32_field;
+ }
+ }
+
+ private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data) throws IOException {
+ Configuration conf = new Configuration();
+
+ int pageSize = data.size() / 10; // Ensure that several pages will be created
+ int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+
+ SimpleGroupFactory f = new SimpleGroupFactory(SCHEMA);
+
+ EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+ for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+ Path file = new Path(root, getFileName(encryptionConfiguration));
+ FileEncryptionProperties encryptionProperties = encryptionConfiguration.getEncryptionProperties();
+ LOG.info("\nWrite " + file.toString());
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withType(SCHEMA)
+ .withConf(conf)
+ .withEncryption(encryptionProperties)
+ .build()) {
+
+ for (SingleRow singleRow : data) {
+ writer.write(
+ f.newGroup()
+ .append(BOOLEAN_FIELD_NAME, singleRow.boolean_field)
+ .append(INT32_FIELD_NAME, singleRow.int32_field)
+ .append(FLOAT_FIELD_NAME, singleRow.float_field)
+ .append(DOUBLE_FIELD_NAME, singleRow.double_field)
+ .append(BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.ba_field))
+ .append(FIXED_LENGTH_BINARY_FIELD_NAME, Binary.fromConstantByteArray(singleRow.flba_field))
+ .append(PLAINTEXT_INT32_FIELD_NAME, singleRow.plaintext_int32_field));
+
+ }
+ }
+ }
+ }
+
+ private String getFileName(EncryptionConfiguration encryptionConfiguration) {
+ return encryptionConfiguration.toString().toLowerCase() + ".parquet.encrypted";
+ }
+
+ private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data) {
+ Configuration conf = new Configuration();
+ DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
+ for (DecryptionConfiguration decryptionConfiguration : decryptionConfigurations) {
+ EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+ for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+ Path file = new Path(root, getFileName(encryptionConfiguration));
+ LOG.info("==> Decryption configuration {}", decryptionConfiguration);
+ FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties();
+
+ LOG.info("--> Read file {} {}", file.toString(), encryptionConfiguration);
+
+ // Read only the non-encrypted columns
+ if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
+ (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+ conf.set("parquet.read.schema", Types.buildMessage()
+ .optional(INT32).named(PLAINTEXT_INT32_FIELD_NAME)
+ .named("FormatTestObject").toString());
+ }
+
+ int rowNum = 0;
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+ .withConf(conf)
+ .withDecryption(fileDecryptionProperties)
+ .build()) {
+ for (Group group = reader.read(); group != null; group = reader.read()) {
+ SingleRow rowExpected = data.get(rowNum++);
+ // plaintext columns
+ if (rowExpected.plaintext_int32_field != group.getInteger(PLAINTEXT_INT32_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+ }
+ // encrypted columns
+ if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
+ if (rowExpected.boolean_field != group.getBoolean(BOOLEAN_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
+ }
+ if (rowExpected.int32_field != group.getInteger(INT32_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+ }
+ if (rowExpected.float_field != group.getFloat(FLOAT_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
+ }
+ if (rowExpected.double_field != group.getDouble(DOUBLE_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
+ }
+ if ((null != rowExpected.ba_field) &&
+ !Arrays.equals(rowExpected.ba_field, group.getBinary(BINARY_FIELD_NAME, 0).getBytes())) {
+ addErrorToErrorCollectorAndLog("Wrong byte array", encryptionConfiguration, decryptionConfiguration);
+ }
+ if (!Arrays.equals(rowExpected.flba_field,
+ group.getBinary(FIXED_LENGTH_BINARY_FIELD_NAME, 0).getBytes())) {
+ addErrorToErrorCollectorAndLog("Wrong fixed-length byte array",
+ encryptionConfiguration, decryptionConfiguration);
+ }
+ }
+ }
+ } catch (ParquetCryptoRuntimeException e) {
+ String errorMessage = e.getMessage();
+ checkResult(file.getName(), decryptionConfiguration, (null == errorMessage ? e.toString() : errorMessage));
+ } catch (Exception e) {
+ addErrorToErrorCollectorAndLog(
+ "Unexpected exception: " + e.getClass().getName() + " with message: " + e.getMessage(),
+ encryptionConfiguration, decryptionConfiguration);
+ }
+ conf.unset("parquet.read.schema");
+ }
+ }
+ }
+
+ private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEncrypted, List<SingleRow> data) throws IOException {
+ Configuration conf = new Configuration();
+ DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
+ for (DecryptionConfiguration decryptionConfiguration : decryptionConfigurations) {
+ EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
+ for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
+ if (readOnlyEncrypted && (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration)) {
+ continue;
+ }
+ Path file = new Path(root, getFileName(encryptionConfiguration));
+ LOG.info("==> Decryption configuration {}", decryptionConfiguration);
+ FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties();
+
+ LOG.info("--> Read file {} {}", file.toString(), encryptionConfiguration);
+
+ // Read only the non-encrypted columns
+ if ((decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) &&
+ (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+ conf.set("parquet.read.schema", Types.buildMessage()
+ .required(BOOLEAN).named(BOOLEAN_FIELD_NAME)
+ .required(INT32).named(INT32_FIELD_NAME)
+ .named("FormatTestObject").toString());
+ }
+
+ int rowNum = 0;
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+ .withConf(conf)
+ .withDecryption(fileDecryptionProperties)
+ .build()) {
+ for (Group group = reader.read(); group != null; group = reader.read()) {
+ SingleRow rowExpected = data.get(rowNum++);
+ // plaintext columns
+ if (rowExpected.boolean_field != group.getBoolean(BOOLEAN_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong bool", encryptionConfiguration, decryptionConfiguration);
+ }
+ if (rowExpected.int32_field != group.getInteger(INT32_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong int", encryptionConfiguration, decryptionConfiguration);
+ }
+ // encrypted columns
+ if (decryptionConfiguration != DecryptionConfiguration.NO_DECRYPTION) {
+ if (rowExpected.float_field != group.getFloat(FLOAT_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong float", encryptionConfiguration, decryptionConfiguration);
+ }
+ if (rowExpected.double_field != group.getDouble(DOUBLE_FIELD_NAME, 0)) {
+ addErrorToErrorCollectorAndLog("Wrong double", encryptionConfiguration, decryptionConfiguration);
+ }
+ }
+ }
+ } catch (ParquetCryptoRuntimeException e) {
+ String errorMessage = e.getMessage();
+ checkResult(file.getName(), decryptionConfiguration, (null == errorMessage ? e.toString() : errorMessage));
+ } catch (Exception e) {
+ addErrorToErrorCollectorAndLog(
+ "Unexpected exception: " + e.getClass().getName() + " with message: " + e.getMessage(),
+ encryptionConfiguration, decryptionConfiguration);
+ }
+ conf.unset("parquet.read.schema");
+ }
+ }
+ }
+
+
+ /**
+ * Check that the decryption result is as expected.
+ */
+ private void checkResult(String file, DecryptionConfiguration decryptionConfiguration, String exceptionMsg) {
+ // Extract encryptionConfigurationNumber from the parquet file name.
+ EncryptionConfiguration encryptionConfiguration = getEncryptionConfigurationFromFilename(file);
+
+ // Encryption_configuration 5 contains aad_prefix and
+ // disable_aad_prefix_storage.
+ // An exception is expected to be thrown if the file is not decrypted with aad_prefix.
+ if (encryptionConfiguration == EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE) {
+ if (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER ||
+ decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_EXPLICIT_KEYS) {
+ if (!exceptionMsg.contains("AAD")) {
+ addErrorToErrorCollectorAndLog("Expecting AAD related exception", exceptionMsg,
+ encryptionConfiguration, decryptionConfiguration);
+ } else {
+ LOG.info("Exception as expected: " + exceptionMsg);
+ }
+ return;
+ }
+ }
+ // Decryption configuration 2 contains aad_prefix. An exception is expected to
+ // be thrown if the file was not encrypted with the same aad_prefix.
+ if (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER_AAD) {
+ if (encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER_DISABLE_AAD_STORAGE &&
+ encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_AND_FOOTER_AAD &&
+ encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION) {
+ if (!exceptionMsg.contains("AAD")) {
+ addErrorToErrorCollectorAndLog("Expecting AAD related exception", exceptionMsg,
+ encryptionConfiguration, decryptionConfiguration);
+ } else {
+ LOG.info("Exception as expected: " + exceptionMsg);
+ }
+ return;
+ }
+ }
+ // Encryption_configuration 7 has null encryptor, so parquet is plaintext.
+ // An exception is expected to be thrown if the file is being decrypted.
+ if (encryptionConfiguration == EncryptionConfiguration.NO_ENCRYPTION) {
+ if ((decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER) ||
+ (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_KEY_RETRIEVER_AAD) ||
+ (decryptionConfiguration == DecryptionConfiguration.DECRYPT_WITH_EXPLICIT_KEYS)) {
+ if (!exceptionMsg.endsWith("Applying decryptor on plaintext file")) {
+ addErrorToErrorCollectorAndLog("Expecting exception Applying decryptor on plaintext file",
+ exceptionMsg, encryptionConfiguration, decryptionConfiguration);
+ } else {
+ LOG.info("Exception as expected: " + exceptionMsg);
+ }
+ return;
+ }
+ }
+ // Decryption configuration 4 is null, so only plaintext file can be read. An exception is expected to
+ // be thrown if the file is encrypted.
+ if (decryptionConfiguration == DecryptionConfiguration.NO_DECRYPTION) {
+ if ((encryptionConfiguration != EncryptionConfiguration.NO_ENCRYPTION &&
+ encryptionConfiguration != EncryptionConfiguration.ENCRYPT_COLUMNS_PLAINTEXT_FOOTER)) {
+ if (!exceptionMsg.endsWith("No keys available") && !exceptionMsg.endsWith("Null File Decryptor") && !exceptionMsg.endsWith("Footer key unavailable")) {
+ addErrorToErrorCollectorAndLog("Expecting No keys available exception", exceptionMsg,
+ encryptionConfiguration, decryptionConfiguration);
+ } else {
+ LOG.info("Exception as expected: " + exceptionMsg);
+ }
+ return;
+ }
+ }
+ if (null != exceptionMsg && !exceptionMsg.isEmpty()) {
+ addErrorToErrorCollectorAndLog("Didn't expect an exception", exceptionMsg,
+ encryptionConfiguration, decryptionConfiguration);
+ }
+ }
+
+ private EncryptionConfiguration getEncryptionConfigurationFromFilename(String file) {
+ if (!file.endsWith(".parquet.encrypted")) {
+ return null;
+ }
+ String fileNamePrefix = file.replaceFirst(".parquet.encrypted", "");
+ try {
+ EncryptionConfiguration encryptionConfiguration = EncryptionConfiguration.valueOf(fileNamePrefix.toUpperCase());
+ return encryptionConfiguration;
+ } catch (IllegalArgumentException e) {
+ LOG.error("File name doesn't match any known encryption configuration: " + file);
+ errorCollector.addError(e);
+ return null;
+ }
+ }
+
+ private void addErrorToErrorCollectorAndLog(String errorMessage, String exceptionMessage, EncryptionConfiguration encryptionConfiguration,
+ DecryptionConfiguration decryptionConfiguration) {
+ String fullErrorMessage = String.format("%s - %s Error: %s, but got [%s]",
+ encryptionConfiguration, decryptionConfiguration, errorMessage, exceptionMessage);
+
+ errorCollector.addError(new Throwable(fullErrorMessage));
+ LOG.error(fullErrorMessage);
+ }
+
+ private void addErrorToErrorCollectorAndLog(String errorMessage, EncryptionConfiguration encryptionConfiguration,
+ DecryptionConfiguration decryptionConfiguration) {
+ String fullErrorMessage = String.format("%s - %s Error: %s",
+ encryptionConfiguration, decryptionConfiguration, errorMessage);
+
+ errorCollector.addError(new Throwable(fullErrorMessage));
+ LOG.error(fullErrorMessage);
+ }
+
+ private static Map<ColumnPath, ColumnEncryptionProperties> getColumnEncryptionPropertiesMap() {
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+ ColumnEncryptionProperties columnPropertiesDouble = ColumnEncryptionProperties
+ .builder(DOUBLE_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[0])
+ .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[0])
+ .build();
+ columnPropertiesMap.put(columnPropertiesDouble.getPath(), columnPropertiesDouble);
+
+ ColumnEncryptionProperties columnPropertiesFloat = ColumnEncryptionProperties
+ .builder(FLOAT_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[1])
+ .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[1])
+ .build();
+ columnPropertiesMap.put(columnPropertiesFloat.getPath(), columnPropertiesFloat);
+
+ ColumnEncryptionProperties columnPropertiesBool = ColumnEncryptionProperties
+ .builder(BOOLEAN_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[2])
+ .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[2])
+ .build();
+ columnPropertiesMap.put(columnPropertiesBool.getPath(), columnPropertiesBool);
+
+ ColumnEncryptionProperties columnPropertiesInt32 = ColumnEncryptionProperties
+ .builder(INT32_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[3])
+ .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[3])
+ .build();
+ columnPropertiesMap.put(columnPropertiesInt32.getPath(), columnPropertiesInt32);
+
+ ColumnEncryptionProperties columnPropertiesBinary = ColumnEncryptionProperties
+ .builder(BINARY_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[4])
+ .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[4])
+ .build();
+ columnPropertiesMap.put(columnPropertiesBinary.getPath(), columnPropertiesBinary);
+
+ ColumnEncryptionProperties columnPropertiesFixed = ColumnEncryptionProperties
+ .builder(FIXED_LENGTH_BINARY_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[5])
+ .withKeyID(COLUMN_ENCRYPTION_KEY_IDS[5])
+ .build();
+ columnPropertiesMap.put(columnPropertiesFixed.getPath(), columnPropertiesFixed);
+
+ return columnPropertiesMap;
+ }
+
+ private static Map<ColumnPath, ColumnDecryptionProperties> getColumnDecryptionPropertiesMap() {
+ Map<ColumnPath, ColumnDecryptionProperties> columnMap = new HashMap<>();
+
+ ColumnDecryptionProperties columnDecryptionPropsDouble = ColumnDecryptionProperties
+ .builder(DOUBLE_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[0])
+ .build();
+ columnMap.put(columnDecryptionPropsDouble.getPath(), columnDecryptionPropsDouble);
+
+ ColumnDecryptionProperties columnDecryptionPropsFloat = ColumnDecryptionProperties
+ .builder(FLOAT_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[1])
+ .build();
+ columnMap.put(columnDecryptionPropsFloat.getPath(), columnDecryptionPropsFloat);
+
+ ColumnDecryptionProperties columnDecryptionPropsBool = ColumnDecryptionProperties
+ .builder(BOOLEAN_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[2])
+ .build();
+ columnMap.put(columnDecryptionPropsBool.getPath(), columnDecryptionPropsBool);
+
+ ColumnDecryptionProperties columnDecryptionPropsInt32 = ColumnDecryptionProperties
+ .builder(INT32_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[3])
+ .build();
+ columnMap.put(columnDecryptionPropsInt32.getPath(), columnDecryptionPropsInt32);
+
+ ColumnDecryptionProperties columnDecryptionPropsBinary = ColumnDecryptionProperties
+ .builder(BINARY_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[4])
+ .build();
+ columnMap.put(columnDecryptionPropsBinary.getPath(), columnDecryptionPropsBinary);
+
+ ColumnDecryptionProperties columnDecryptionPropsFixed = ColumnDecryptionProperties
+ .builder(FIXED_LENGTH_BINARY_FIELD_NAME)
+ .withKey(COLUMN_ENCRYPTION_KEYS[5])
+ .build();
+ columnMap.put(columnDecryptionPropsFixed.getPath(), columnDecryptionPropsFixed);
+
+ return columnMap;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9ef8081..b98e6e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,7 @@
<mockito.version>1.10.19</mockito.version>
<net.openhft.version>0.9</net.openhft.version>
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
+ <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<!-- parquet-cli dependencies -->
<opencsv.version>2.3</opencsv.version>
@@ -215,6 +216,30 @@
<pluginManagement>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${exec-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>git submodule update</id>
+ <phase>initialize</phase>
+ <configuration>
+ <executable>git</executable>
+ <arguments>
+ <argument>submodule</argument>
+ <argument>update</argument>
+ <argument>--init</argument>
+ <argument>--recursive</argument>
+ </arguments>
+ </configuration>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
<!-- Disable the source artifact from ASF parent -->
<artifactId>maven-assembly-plugin</artifactId>
<executions>
@@ -465,6 +490,7 @@
<exclude>**/target/**</exclude>
<exclude>.git/**</exclude>
<exclude>.gitignore</exclude>
+ <exclude>.gitmodules</exclude>
<exclude>.idea/**</exclude>
<exclude>*/jdiff/*.xml</exclude>
<exclude>.travis.yml</exclude>
diff --git a/submodules/parquet-testing b/submodules/parquet-testing
new file mode 160000
index 0000000..40379b3
--- /dev/null
+++ b/submodules/parquet-testing
@@ -0,0 +1 @@
+Subproject commit 40379b3c58298fd22589dec7e41748375b5a8e82