You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/05/29 10:07:30 UTC
[parquet-mr] branch encryption updated: PARQUET-1229: Parquet MR
encryption (#776)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/encryption by this push:
new 06b5372 PARQUET-1229: Parquet MR encryption (#776)
06b5372 is described below
commit 06b5372ddf0f1108c2109cc9011b2497a136830f
Author: ggershinsky <gg...@users.noreply.github.com>
AuthorDate: Fri May 29 13:07:03 2020 +0300
PARQUET-1229: Parquet MR encryption (#776)
---
dev/travis-before_install-encryption.sh | 29 --
.../internal/column/columnindex/OffsetIndex.java | 7 +
.../column/columnindex/OffsetIndexBuilder.java | 6 +-
.../org/apache/parquet/format/BlockCipher.java | 8 +-
.../java/org/apache/parquet/HadoopReadOptions.java | 32 +-
.../org/apache/parquet/ParquetReadOptions.java | 19 +-
.../apache/parquet/crypto/AADPrefixVerifier.java | 6 +-
.../java/org/apache/parquet/crypto/AesCipher.java | 56 +-
.../org/apache/parquet/crypto/AesCtrDecryptor.java | 16 +-
.../org/apache/parquet/crypto/AesCtrEncryptor.java | 16 +-
.../org/apache/parquet/crypto/AesGcmDecryptor.java | 19 +-
.../org/apache/parquet/crypto/AesGcmEncryptor.java | 16 +-
.../parquet/crypto/DecryptionKeyRetriever.java | 6 +-
.../crypto/DecryptionPropertiesFactory.java | 8 +-
.../crypto/EncryptionPropertiesFactory.java | 6 +-
.../crypto/InternalColumnDecryptionSetup.java | 6 +-
.../crypto/InternalColumnEncryptionSetup.java | 6 +-
.../parquet/crypto/InternalFileDecryptor.java | 70 +--
.../parquet/crypto/InternalFileEncryptor.java | 31 +-
.../parquet/crypto/KeyAccessDeniedException.java | 17 +-
.../apache/parquet/crypto/ModuleCipherFactory.java | 12 +-
.../crypto/ParquetCryptoRuntimeException.java | 2 +-
.../parquet/crypto/StringKeyIdRetriever.java | 39 --
...xception.java => TagVerificationException.java} | 21 +-
.../format/converter/ParquetMetadataConverter.java | 370 +++++++++++--
.../parquet/hadoop/ColumnChunkPageReadStore.java | 127 +++--
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 165 +++++-
.../parquet/hadoop/ColumnIndexFilterUtils.java | 5 +
.../parquet/hadoop/DictionaryPageReader.java | 19 +-
.../hadoop/InternalParquetRecordWriter.java | 10 +-
.../apache/parquet/hadoop/ParquetFileReader.java | 276 ++++++++--
.../apache/parquet/hadoop/ParquetFileWriter.java | 243 +++++++--
.../apache/parquet/hadoop/ParquetOutputFormat.java | 24 +-
.../org/apache/parquet/hadoop/ParquetReader.java | 18 +-
.../apache/parquet/hadoop/ParquetRecordReader.java | 2 +-
.../org/apache/parquet/hadoop/ParquetWriter.java | 30 +-
.../parquet/hadoop/metadata/BlockMetaData.java | 20 +-
.../hadoop/metadata/ColumnChunkMetaData.java | 166 +++++-
.../parquet/hadoop/metadata/FileMetaData.java | 11 +
.../parquet/hadoop/util/HadoopInputFile.java | 4 +
.../apache/parquet/hadoop/TestBloomEncryption.java | 313 +++++++++++
.../parquet/hadoop/TestColumnIndexEncryption.java | 571 +++++++++++++++++++++
42 files changed, 2382 insertions(+), 446 deletions(-)
diff --git a/dev/travis-before_install-encryption.sh b/dev/travis-before_install-encryption.sh
deleted file mode 100755
index 0e3a3f6..0000000
--- a/dev/travis-before_install-encryption.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-################################################################################
-# This is a branch-specific script that gets invoked at the end of
-# travis-before_install.sh. It is run for the bloom-filter branch only.
-################################################################################
-
-cd ..
-git clone https://github.com/apache/parquet-format.git
-cd parquet-format
-mvn install -DskipTests --batch-mode
-cd $TRAVIS_BUILD_DIR
-
-
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
index ba984eb..02d58af 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
@@ -49,6 +49,13 @@ public interface OffsetIndex {
* @return the index of the first row in the page
*/
public long getFirstRowIndex(int pageIndex);
+
+ /**
+ * @param pageIndex
+ * the index of the page
+ * @return the original ordinal of the page in the column chunk
+ */
+ public int getPageOrdinal(int pageIndex);
/**
* @param pageIndex
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
index e4907b5..4909744 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
@@ -65,6 +65,11 @@ public class OffsetIndexBuilder {
public long getFirstRowIndex(int pageIndex) {
return firstRowIndexes[pageIndex];
}
+
+ @Override
+ public int getPageOrdinal(int pageIndex) {
+ return pageIndex;
+ }
}
private static final OffsetIndexBuilder NO_OP_BUILDER = new OffsetIndexBuilder() {
@@ -171,5 +176,4 @@ public class OffsetIndexBuilder {
return offsetIndex;
}
-
}
diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
index 48c0bf2..37b0b58 100755
--- a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
+++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
@@ -35,9 +35,8 @@ public interface BlockCipher{
* The ciphertext starts at offset 4 and fills up the rest of the returned byte array.
* The ciphertext includes the nonce and (in case of GCM cipher) the tag, as detailed in the
* Parquet Modular Encryption specification.
- * @throws IOException thrown upon any crypto problem encountered during encryption
*/
- public byte[] encrypt(byte[] plaintext, byte[] AAD) throws IOException;
+ public byte[] encrypt(byte[] plaintext, byte[] AAD);
}
@@ -51,9 +50,8 @@ public interface BlockCipher{
* Parquet Modular Encryption specification.
* @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher)
* @return plaintext - starts at offset 0 of the output value, and fills up the entire byte array.
- * @throws IOException thrown upon any crypto problem encountered during decryption
*/
- public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws IOException;
+ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
/**
* Convenience decryption method that reads the length and ciphertext from the input stream.
@@ -61,7 +59,7 @@ public interface BlockCipher{
* @param from Input stream with length and ciphertext.
* @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher)
* @return plaintext - starts at offset 0 of the output, and fills up the entire byte array.
- * @throws IOException thrown upon any crypto or IO problem encountered during decryption
+ * @throws IOException - Stream I/O problems
*/
public byte[] decrypt(InputStream from, byte[] AAD) throws IOException;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index b16a8c4..43f0f1d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -20,8 +20,11 @@
package org.apache.parquet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.DecryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.util.HadoopCodecs;
@@ -55,11 +58,12 @@ public class HadoopReadOptions extends ParquetReadOptions {
ByteBufferAllocator allocator,
int maxAllocationSize,
Map<String, String> properties,
- Configuration conf) {
+ Configuration conf,
+ FileDecryptionProperties fileDecryptionProperties) {
super(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator,
- maxAllocationSize, properties
+ maxAllocationSize, properties, fileDecryptionProperties
);
this.conf = conf;
}
@@ -81,11 +85,21 @@ public class HadoopReadOptions extends ParquetReadOptions {
return new Builder(conf);
}
+ public static Builder builder(Configuration conf, Path filePath) {
+ return new Builder(conf, filePath);
+ }
+
public static class Builder extends ParquetReadOptions.Builder {
private final Configuration conf;
+ private final Path filePath;
public Builder(Configuration conf) {
+ this(conf, null);
+ }
+
+ public Builder(Configuration conf, Path filePath) {
this.conf = conf;
+ this.filePath = filePath;
useSignedStringMinMax(conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
useDictionaryFilter(conf.getBoolean(DICTIONARY_FILTERING_ENABLED, true));
useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
@@ -105,10 +119,22 @@ public class HadoopReadOptions extends ParquetReadOptions {
@Override
public ParquetReadOptions build() {
+ if (null == fileDecryptionProperties) {
+ // if not set, check if Hadoop conf defines decryption factory and properties
+ fileDecryptionProperties = createDecryptionProperties(filePath, conf);
+ }
return new HadoopReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter,
- codecFactory, allocator, maxAllocationSize, properties, conf);
+ codecFactory, allocator, maxAllocationSize, properties, conf, fileDecryptionProperties);
+ }
+ }
+
+ private static FileDecryptionProperties createDecryptionProperties(Path file, Configuration hadoopConfig) {
+ DecryptionPropertiesFactory cryptoFactory = DecryptionPropertiesFactory.loadFactory(hadoopConfig);
+ if (null == cryptoFactory) {
+ return null;
}
+ return cryptoFactory.getFileDecryptionProperties(hadoopConfig, file);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 2fdca3b..f478cef 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -22,6 +22,7 @@ package org.apache.parquet;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.util.HadoopCodecs;
@@ -57,6 +58,7 @@ public class ParquetReadOptions {
private final ByteBufferAllocator allocator;
private final int maxAllocationSize;
private final Map<String, String> properties;
+ private final FileDecryptionProperties fileDecryptionProperties;
ParquetReadOptions(boolean useSignedStringMinMax,
boolean useStatsFilter,
@@ -70,7 +72,8 @@ public class ParquetReadOptions {
CompressionCodecFactory codecFactory,
ByteBufferAllocator allocator,
int maxAllocationSize,
- Map<String, String> properties) {
+ Map<String, String> properties,
+ FileDecryptionProperties fileDecryptionProperties) {
this.useSignedStringMinMax = useSignedStringMinMax;
this.useStatsFilter = useStatsFilter;
this.useDictionaryFilter = useDictionaryFilter;
@@ -84,6 +87,7 @@ public class ParquetReadOptions {
this.allocator = allocator;
this.maxAllocationSize = maxAllocationSize;
this.properties = Collections.unmodifiableMap(properties);
+ this.fileDecryptionProperties = fileDecryptionProperties;
}
public boolean useSignedStringMinMax() {
@@ -142,6 +146,10 @@ public class ParquetReadOptions {
return properties.get(property);
}
+ public FileDecryptionProperties getDecryptionProperties() {
+ return fileDecryptionProperties;
+ }
+
public boolean isEnabled(String property, boolean defaultValue) {
Optional<String> propValue = Optional.ofNullable(properties.get(property));
return propValue.isPresent() ? Boolean.valueOf(propValue.get())
@@ -167,6 +175,7 @@ public class ParquetReadOptions {
protected ByteBufferAllocator allocator = new HeapByteBufferAllocator();
protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT;
protected Map<String, String> properties = new HashMap<>();
+ protected FileDecryptionProperties fileDecryptionProperties = null;
public Builder useSignedStringMinMax(boolean useSignedStringMinMax) {
this.useSignedStringMinMax = useSignedStringMinMax;
@@ -277,6 +286,11 @@ public class ParquetReadOptions {
return this;
}
+ public Builder withDecryption(FileDecryptionProperties fileDecryptionProperties) {
+ this.fileDecryptionProperties = fileDecryptionProperties;
+ return this;
+ }
+
public Builder set(String key, String value) {
properties.put(key, value);
return this;
@@ -292,6 +306,7 @@ public class ParquetReadOptions {
withCodecFactory(options.codecFactory);
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
+ withDecryption(options.fileDecryptionProperties);
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
set(keyValue.getKey(), keyValue.getValue());
}
@@ -302,7 +317,7 @@ public class ParquetReadOptions {
return new ParquetReadOptions(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter,
- codecFactory, allocator, maxAllocationSize, properties);
+ codecFactory, allocator, maxAllocationSize, properties, fileDecryptionProperties);
}
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
index a0a8029..ee5169d 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AADPrefixVerifier.java
@@ -19,8 +19,6 @@
package org.apache.parquet.crypto;
-import java.io.IOException;
-
public interface AADPrefixVerifier {
/**
@@ -28,7 +26,7 @@ public interface AADPrefixVerifier {
* Must be thread-safe.
*
* @param aadPrefix AAD Prefix
- * @throws IOException Throw exception if AAD prefix is wrong.
+ * @throws ParquetCryptoRuntimeException Throw exception if AAD prefix is wrong.
*/
- public void verify(byte[] aadPrefix) throws IOException;
+ public void verify(byte[] aadPrefix) throws ParquetCryptoRuntimeException;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
index fb9588d..6b9f24c 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
@@ -24,7 +24,6 @@ import javax.crypto.spec.SecretKeySpec;
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import java.io.IOException;
import java.security.SecureRandom;
public class AesCipher {
@@ -44,7 +43,7 @@ public class AesCipher {
protected Cipher cipher;
protected final byte[] localNonce;
- AesCipher(AesMode mode, byte[] keyBytes) throws IllegalArgumentException, IOException {
+ AesCipher(AesMode mode, byte[] keyBytes) {
if (null == keyBytes) {
throw new IllegalArgumentException("Null key bytes");
}
@@ -67,30 +66,69 @@ public class AesCipher {
}
public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType,
- short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) {
+ int rowGroupOrdinal, int columnOrdinal, int pageOrdinal) {
+
byte[] typeOrdinalBytes = new byte[1];
typeOrdinalBytes[0] = moduleType.getValue();
+
if (ModuleType.Footer == moduleType) {
return concatByteArrays(fileAAD, typeOrdinalBytes);
}
- byte[] rowGroupOrdinalBytes = shortToBytesLE(rowGroupOrdinal);
- byte[] columnOrdinalBytes = shortToBytesLE(columnOrdinal);
+ if (rowGroupOrdinal < 0) {
+ throw new IllegalArgumentException("Wrong row group ordinal: " + rowGroupOrdinal);
+ }
+ short shortRGOrdinal = (short) rowGroupOrdinal;
+ if (shortRGOrdinal != rowGroupOrdinal) {
+ throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+ + "more than " + Short.MAX_VALUE + " row groups: " + rowGroupOrdinal);
+ }
+ byte[] rowGroupOrdinalBytes = shortToBytesLE(shortRGOrdinal);
+
+ if (columnOrdinal < 0) {
+ throw new IllegalArgumentException("Wrong column ordinal: " + columnOrdinal);
+ }
+ short shortColumOrdinal = (short) columnOrdinal;
+ if (shortColumOrdinal != columnOrdinal) {
+ throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+ + "more than " + Short.MAX_VALUE + " columns: " + columnOrdinal);
+ }
+ byte[] columnOrdinalBytes = shortToBytesLE(shortColumOrdinal);
+
if (ModuleType.DataPage != moduleType && ModuleType.DataPageHeader != moduleType) {
return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes);
}
- byte[] pageOrdinalBytes = shortToBytesLE(pageOrdinal);
+ if (pageOrdinal < 0) {
+ throw new IllegalArgumentException("Wrong page ordinal: " + pageOrdinal);
+ }
+ short shortPageOrdinal = (short) pageOrdinal;
+ if (shortPageOrdinal != pageOrdinal) {
+ throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+ + "more than " + Short.MAX_VALUE + " pages per chunk: " + pageOrdinal);
+ }
+ byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
+
return concatByteArrays(fileAAD, typeOrdinalBytes, rowGroupOrdinalBytes, columnOrdinalBytes, pageOrdinalBytes);
}
public static byte[] createFooterAAD(byte[] aadPrefixBytes) {
- return createModuleAAD(aadPrefixBytes, ModuleType.Footer, (short) -1, (short) -1, (short) -1);
+ return createModuleAAD(aadPrefixBytes, ModuleType.Footer, -1, -1, -1);
}
// Update last two bytes with new page ordinal (instead of creating new page AAD from scratch)
- public static void quickUpdatePageAAD(byte[] pageAAD, short newPageOrdinal) {
- byte[] pageOrdinalBytes = shortToBytesLE(newPageOrdinal);
+ public static void quickUpdatePageAAD(byte[] pageAAD, int newPageOrdinal) {
+ java.util.Objects.requireNonNull(pageAAD);
+ if (newPageOrdinal < 0) {
+ throw new IllegalArgumentException("Wrong page ordinal: " + newPageOrdinal);
+ }
+ short shortPageOrdinal = (short) newPageOrdinal;
+ if (shortPageOrdinal != newPageOrdinal) {
+ throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have "
+ + "more than " + Short.MAX_VALUE + " pages per chunk: " + newPageOrdinal);
+ }
+
+ byte[] pageOrdinalBytes = shortToBytesLE(shortPageOrdinal);
System.arraycopy(pageOrdinalBytes, 0, pageAAD, pageAAD.length - 2, 2);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
index bf17b8c..447404a 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java
@@ -32,13 +32,13 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
private final byte[] ctrIV;
- AesCtrDecryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+ AesCtrDecryptor(byte[] keyBytes) {
super(AesMode.CTR, keyBytes);
try {
cipher = Cipher.getInstance(AesMode.CTR.getCipherName());
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to create CTR cipher", e);
+ throw new ParquetCryptoRuntimeException("Failed to create CTR cipher", e);
}
ctrIV = new byte[CTR_IV_LENGTH];
// Setting last bit of initial CTR counter to 1
@@ -46,19 +46,18 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
}
@Override
- public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws IOException {
+ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) {
int cipherTextOffset = SIZE_LENGTH;
int cipherTextLength = lengthAndCiphertext.length - SIZE_LENGTH;
return decrypt(lengthAndCiphertext, cipherTextOffset, cipherTextLength, AAD);
}
- public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength,
- byte[] AAD) throws IOException {
+ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) {
int plainTextLength = cipherTextLength - NONCE_LENGTH;
if (plainTextLength < 1) {
- throw new IOException("Wrong input length " + plainTextLength);
+ throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
}
// Get the nonce from ciphertext
@@ -82,7 +81,7 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset);
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to decrypt", e);
+ throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
}
return plainText;
@@ -118,8 +117,7 @@ public class AesCtrDecryptor extends AesCipher implements BlockCipher.Decryptor{
while (gotBytes < ciphertextLength) {
int n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes);
if (n <= 0) {
- throw new IOException("Tried to read " + ciphertextLength +
- " bytes, but only got " + gotBytes + " bytes.");
+ throw new IOException("Tried to read " + ciphertextLength + " bytes, but only got " + gotBytes + " bytes.");
}
gotBytes += n;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
index ec1f8d4..537789c 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrEncryptor.java
@@ -25,20 +25,19 @@ import javax.crypto.spec.IvParameterSpec;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.format.BlockCipher;
-import java.io.IOException;
import java.security.GeneralSecurityException;
public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
private final byte[] ctrIV;
- AesCtrEncryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+ AesCtrEncryptor(byte[] keyBytes) {
super(AesMode.CTR, keyBytes);
try {
cipher = Cipher.getInstance(AesMode.CTR.getCipherName());
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to create CTR cipher", e);
+ throw new ParquetCryptoRuntimeException("Failed to create CTR cipher", e);
}
ctrIV = new byte[CTR_IV_LENGTH];
@@ -47,20 +46,19 @@ public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
}
@Override
- public byte[] encrypt(byte[] plainText, byte[] AAD) throws IOException {
+ public byte[] encrypt(byte[] plainText, byte[] AAD) {
return encrypt(true, plainText, AAD);
}
- public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD) throws IOException {
+ public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD) {
randomGenerator.nextBytes(localNonce);
return encrypt(writeLength, plainText, localNonce, AAD);
}
- public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD)
- throws IOException {
+ public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD) {
if (nonce.length != NONCE_LENGTH) {
- throw new IOException("Wrong nonce length " + nonce.length);
+ throw new ParquetCryptoRuntimeException("Wrong nonce length " + nonce.length);
}
int plainTextLength = plainText.length;
int cipherTextLength = NONCE_LENGTH + plainTextLength;
@@ -84,7 +82,7 @@ public class AesCtrEncryptor extends AesCipher implements BlockCipher.Encryptor{
cipher.doFinal(plainText, inputOffset, inputLength, cipherText, outputOffset);
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to encrypt", e);
+ throw new ParquetCryptoRuntimeException("Failed to encrypt", e);
}
// Add ciphertext length
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
index d7f1486..1524d8e 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
@@ -19,6 +19,7 @@
package org.apache.parquet.crypto;
+import javax.crypto.AEADBadTagException;
import javax.crypto.Cipher;
import javax.crypto.spec.GCMParameterSpec;
@@ -30,31 +31,29 @@ import java.security.GeneralSecurityException;
public class AesGcmDecryptor extends AesCipher implements BlockCipher.Decryptor{
-
- AesGcmDecryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+ AesGcmDecryptor(byte[] keyBytes) {
super(AesMode.GCM, keyBytes);
try {
cipher = Cipher.getInstance(AesMode.GCM.getCipherName());
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to create GCM cipher", e);
+ throw new ParquetCryptoRuntimeException("Failed to create GCM cipher", e);
}
}
@Override
- public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws IOException {
+ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) {
int cipherTextOffset = SIZE_LENGTH;
int cipherTextLength = lengthAndCiphertext.length - SIZE_LENGTH;
return decrypt(lengthAndCiphertext, cipherTextOffset, cipherTextLength, AAD);
}
- public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD)
- throws IOException {
+ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLength, byte[] AAD) {
int plainTextLength = cipherTextLength - GCM_TAG_LENGTH - NONCE_LENGTH;
if (plainTextLength < 1) {
- throw new IOException("Wrong input length " + plainTextLength);
+ throw new ParquetCryptoRuntimeException("Wrong input length " + plainTextLength);
}
// Get the nonce from ciphertext
@@ -70,8 +69,10 @@ public class AesGcmDecryptor extends AesCipher implements BlockCipher.Decryptor{
if (null != AAD) cipher.updateAAD(AAD);
cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset);
- } catch (GeneralSecurityException e) {
- throw new IOException("Failed to decrypt", e);
+ } catch (AEADBadTagException e) {
+ throw new TagVerificationException("GCM tag check failed", e);
+ } catch (GeneralSecurityException e) {
+ throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
}
return plainText;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
index bdfed7f..d456447 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmEncryptor.java
@@ -25,36 +25,34 @@ import javax.crypto.spec.GCMParameterSpec;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.format.BlockCipher;
-import java.io.IOException;
import java.security.GeneralSecurityException;
public class AesGcmEncryptor extends AesCipher implements BlockCipher.Encryptor{
- AesGcmEncryptor(byte[] keyBytes) throws IllegalArgumentException, IOException {
+ AesGcmEncryptor(byte[] keyBytes) {
super(AesMode.GCM, keyBytes);
try {
cipher = Cipher.getInstance(AesMode.GCM.getCipherName());
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to create GCM cipher", e);
+ throw new ParquetCryptoRuntimeException("Failed to create GCM cipher", e);
}
}
@Override
- public byte[] encrypt(byte[] plainText, byte[] AAD) throws IOException {
+ public byte[] encrypt(byte[] plainText, byte[] AAD) {
return encrypt(true, plainText, AAD);
}
- public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD) throws IOException {
+ public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] AAD) {
randomGenerator.nextBytes(localNonce);
return encrypt(writeLength, plainText, localNonce, AAD);
}
- public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD)
- throws IOException {
+ public byte[] encrypt(boolean writeLength, byte[] plainText, byte[] nonce, byte[] AAD) {
if (nonce.length != NONCE_LENGTH) {
- throw new IOException("Wrong nonce length " + nonce.length);
+ throw new ParquetCryptoRuntimeException("Wrong nonce length " + nonce.length);
}
int plainTextLength = plainText.length;
int cipherTextLength = NONCE_LENGTH + plainTextLength + GCM_TAG_LENGTH;
@@ -71,7 +69,7 @@ public class AesGcmEncryptor extends AesCipher implements BlockCipher.Encryptor{
cipher.doFinal(plainText, inputOffset, inputLength, cipherText, outputOffset);
} catch (GeneralSecurityException e) {
- throw new IOException("Failed to encrypt", e);
+ throw new ParquetCryptoRuntimeException("Failed to encrypt", e);
}
// Add ciphertext length
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
index f134bb8..efa3fd2 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionKeyRetriever.java
@@ -19,8 +19,6 @@
package org.apache.parquet.crypto;
-import java.io.IOException;
-
/**
* Interface for classes retrieving encryption keys using the key metadata.
* Implementations must be thread-safe, if same KeyRetriever object is passed to multiple file readers.
@@ -35,7 +33,7 @@ public interface DecryptionKeyRetriever {
* @param keyMetaData arbitrary byte array with encryption key metadata
* @return encryption key. Key length can be either 16, 24 or 32 bytes.
* @throws KeyAccessDeniedException thrown upon access control problems (authentication or authorization)
- * @throws IOException thrown upon key retrieval problems unrelated to access control
+ * @throws ParquetCryptoRuntimeException thrown upon key retrieval problems unrelated to access control
*/
- public byte[] getKey(byte[] keyMetaData) throws KeyAccessDeniedException, IOException;
+ public byte[] getKey(byte[] keyMetaData) throws KeyAccessDeniedException, ParquetCryptoRuntimeException;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java
index 4e1f5ac..c2a3d56 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/DecryptionPropertiesFactory.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
/**
* DecryptionPropertiesFactory interface enables transparent activation of Parquet decryption.
*
- * It's customized implementations produce decryption properties for each Parquet file, using the input information
+ * Its customized implementations produce decryption properties for each Parquet file, using the input information
* available in Parquet file readers: file path and Hadoop configuration properties that can pass custom parameters
* required by a crypto factory. A factory implementation can use or ignore any of these inputs.
*
@@ -52,7 +52,8 @@ public interface DecryptionPropertiesFactory {
*
* @param conf Configuration where user specifies the class path
* @return object with class DecryptionPropertiesFactory if user specified the class path and invoking of
- * the class succeeds, null if user doesn't specify the class path
+ * the class succeeds. Null if user doesn't specify the class path (no decryption factory then - not required for plaintext files.
+ * Or for plaintext columns in encrypted files with plaintext footer).
* @throws BadConfigurationException if the instantiation of the configured class fails
*/
static DecryptionPropertiesFactory loadFactory(Configuration conf) {
@@ -79,7 +80,8 @@ public interface DecryptionPropertiesFactory {
* @param hadoopConfig Configuration that is used to pass the needed information, e.g. KMS uri
* @param filePath File path of the parquet file
* Can be used for AAD prefix verification, part of key metadata etc
- * @return object with class of FileDecryptionProperties
+ * @return object with class of FileDecryptionProperties. Null return value means no decryption properties
+ * are available for the file (not required for plaintext files. Or for plaintext columns in encrypted files with plaintext footer).
* @throws ParquetCryptoRuntimeException if there is an exception while creating the object
*/
FileDecryptionProperties getFileDecryptionProperties(Configuration hadoopConfig, Path filePath) throws ParquetCryptoRuntimeException;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java
index 84cbadd..59d2ee3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/EncryptionPropertiesFactory.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
/**
* EncryptionPropertiesFactory interface enables transparent activation of Parquet encryption.
*
- * It's customized implementations produce encryption properties for each Parquet file, using the input information
+ * Its customized implementations produce encryption properties for each Parquet file, using the input information
* available in Parquet file writers: file path, file extended schema - and also Hadoop configuration properties that
* can pass custom parameters required by a crypto factory. A factory implementation can use or ignore any of these
* inputs.
@@ -55,7 +55,7 @@ public interface EncryptionPropertiesFactory {
*
* @param conf Configuration where user specifies the class path
* @return object with class EncryptionPropertiesFactory if user specified the class path and invoking of
- * the class succeeds, null if user doesn't specify the class path
+ * the class succeeds. Null if user doesn't specify the class path (no encryption then).
* @throws BadConfigurationException if the instantiation of the configured class fails
*/
static EncryptionPropertiesFactory loadFactory(Configuration conf) {
@@ -85,7 +85,7 @@ public interface EncryptionPropertiesFactory {
* Implementations must not presume the path is permanent,
* as the file can be moved or renamed later
* @param fileWriteContext WriteContext to provide information like schema to build the FileEncryptionProperties
- * @return object with class of FileEncryptionProperties
+ * @return object with class of FileEncryptionProperties. Null return value means the file should not be encrypted.
* @throws ParquetCryptoRuntimeException if there is an exception while creating the object
*/
FileEncryptionProperties getFileEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
index 769c8d2..bab7c43 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnDecryptionSetup.java
@@ -29,12 +29,12 @@ public class InternalColumnDecryptionSetup {
private final boolean isEncryptedWithFooterKey;
private final BlockCipher.Decryptor dataDecryptor;
private final BlockCipher.Decryptor metaDataDecryptor;
- private final short columnOrdinal;
+ private final int columnOrdinal;
private final byte[] keyMetadata;
InternalColumnDecryptionSetup(ColumnPath path, boolean encrypted,
boolean isEncryptedWithFooterKey, BlockCipher.Decryptor dataDecryptor,
- BlockCipher.Decryptor metaDataDecryptor, short columnOrdinal, byte[] keyMetadata) {
+ BlockCipher.Decryptor metaDataDecryptor, int columnOrdinal, byte[] keyMetadata) {
this.columnPath = path;
this.isEncrypted = encrypted;
this.isEncryptedWithFooterKey = isEncryptedWithFooterKey;
@@ -64,7 +64,7 @@ public class InternalColumnDecryptionSetup {
return columnPath;
}
- public short getOrdinal() {
+ public int getOrdinal() {
return columnOrdinal;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
index d4d2058..9be396a 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalColumnEncryptionSetup.java
@@ -32,9 +32,9 @@ public class InternalColumnEncryptionSetup {
private final BlockCipher.Encryptor metadataEncryptor;
private final BlockCipher.Encryptor dataEncryptor;
private final ColumnCryptoMetaData columnCryptoMetaData;
- private final short ordinal;
+ private final int ordinal;
- InternalColumnEncryptionSetup(ColumnEncryptionProperties encryptionProperties, short ordinal,
+ InternalColumnEncryptionSetup(ColumnEncryptionProperties encryptionProperties, int ordinal,
BlockCipher.Encryptor dataEncryptor, BlockCipher.Encryptor metaDataEncryptor) {
this.encryptionProperties = encryptionProperties;
this.dataEncryptor = dataEncryptor;
@@ -72,7 +72,7 @@ public class InternalColumnEncryptionSetup {
return columnCryptoMetaData;
}
- public short getOrdinal() {
+ public int getOrdinal() {
return ordinal;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
index d104401..683155d 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileDecryptor.java
@@ -22,7 +22,6 @@ package org.apache.parquet.crypto;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.EncryptionAlgorithm;
import org.apache.parquet.hadoop.metadata.ColumnPath;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
@@ -45,7 +44,7 @@ public class InternalFileDecryptor {
private BlockCipher.Decryptor aesCtrDecryptorWithFooterKey;
private boolean plaintextFile;
- public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) throws IOException {
+ public InternalFileDecryptor(FileDecryptionProperties fileDecryptionProperties) {
this.fileDecryptionProperties= fileDecryptionProperties;
checkPlaintextFooterIntegrity = fileDecryptionProperties.checkFooterIntegrity();
footerKey = fileDecryptionProperties.getFooterKey();
@@ -56,7 +55,7 @@ public class InternalFileDecryptor {
this.plaintextFile = false;
}
- private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) throws IOException {
+ private BlockCipher.Decryptor getThriftModuleDecryptor(byte[] columnKey) {
if (null == columnKey) { // Decryptor with footer key
if (null == aesGcmDecryptorWithFooterKey) {
aesGcmDecryptorWithFooterKey = ModuleCipherFactory.getDecryptor(AesMode.GCM, footerKey);
@@ -67,7 +66,7 @@ public class InternalFileDecryptor {
}
}
- private BlockCipher.Decryptor getDataModuleDecryptor(byte[] columnKey) throws IOException {
+ private BlockCipher.Decryptor getDataModuleDecryptor(byte[] columnKey) {
if (algorithm.isSetAES_GCM_V1()) {
return getThriftModuleDecryptor(columnKey);
}
@@ -83,21 +82,21 @@ public class InternalFileDecryptor {
}
}
- public InternalColumnDecryptionSetup getColumnSetup(ColumnPath path) throws IOException {
+ public InternalColumnDecryptionSetup getColumnSetup(ColumnPath path) {
if (!fileCryptoMetaDataProcessed) {
- throw new IOException("Haven't parsed the file crypto metadata yet");
+ throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
}
InternalColumnDecryptionSetup columnDecryptionSetup = columnMap.get(path);
if (null == columnDecryptionSetup) {
- throw new IOException("Failed to find decryption setup for column " + path);
+ throw new ParquetCryptoRuntimeException("Failed to find decryption setup for column " + path);
}
return columnDecryptionSetup;
}
- public BlockCipher.Decryptor getFooterDecryptor() throws IOException {
+ public BlockCipher.Decryptor fetchFooterDecryptor() {
if (!fileCryptoMetaDataProcessed) {
- throw new IOException("Haven't parsed the file crypto metadata yet");
+ throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
}
if (!encryptedFooter) {
return null;
@@ -107,7 +106,7 @@ public class InternalFileDecryptor {
}
public void setFileCryptoMetaData(EncryptionAlgorithm algorithm,
- boolean encryptedFooter, byte[] footerKeyMetaData) throws IOException {
+ boolean encryptedFooter, byte[] footerKeyMetaData) {
// first use of the decryptor
if (!fileCryptoMetaDataProcessed) {
@@ -137,19 +136,20 @@ public class InternalFileDecryptor {
mustSupplyAadPrefix = algorithm.getAES_GCM_CTR_V1().isSupply_aad_prefix();
aadFileUnique = algorithm.getAES_GCM_CTR_V1().getAad_file_unique();
} else {
- throw new IOException("Unsupported algorithm: " + algorithm);
+ throw new ParquetCryptoRuntimeException("Unsupported algorithm: " + algorithm);
}
// Handle AAD prefix
byte[] aadPrefix = aadPrefixInProperties;
if (mustSupplyAadPrefix && (null == aadPrefixInProperties)) {
- throw new IOException("AAD prefix used for file encryption, but not stored in file and not supplied in decryption properties");
+ throw new ParquetCryptoRuntimeException("AAD prefix used for file encryption, "
+ + "but not stored in file and not supplied in decryption properties");
}
if (fileHasAadPrefix) {
if (null != aadPrefixInProperties) {
if (!Arrays.equals(aadPrefixInProperties, aadPrefixInFile)) {
- throw new IOException("AAD Prefix in file and in decryption properties is not the same");
+ throw new ParquetCryptoRuntimeException("AAD Prefix in file and in decryption properties is not the same");
}
}
if (null != aadPrefixVerifier) {
@@ -159,10 +159,10 @@ public class InternalFileDecryptor {
}
else {
if (!mustSupplyAadPrefix && (null != aadPrefixInProperties)) {
- throw new IOException("AAD Prefix set in decryption properties, but was not used for file encryption");
+ throw new ParquetCryptoRuntimeException("AAD Prefix set in decryption properties, but was not used for file encryption");
}
if (null != aadPrefixVerifier) {
- throw new IOException("AAD Prefix Verifier is set, but AAD Prefix not found in file");
+ throw new ParquetCryptoRuntimeException("AAD Prefix Verifier is set, but AAD Prefix not found in file");
}
}
@@ -176,20 +176,20 @@ public class InternalFileDecryptor {
if (null == footerKey) { // ignore footer key metadata if footer key is explicitly set via API
if (encryptedFooter || checkPlaintextFooterIntegrity) {
if (null == footerKeyMetaData) {
- throw new IOException("No footer key or key metadata");
+ throw new ParquetCryptoRuntimeException("No footer key or key metadata");
}
if (null == keyRetriever) {
- throw new IOException("No footer key or key retriever");
+ throw new ParquetCryptoRuntimeException("No footer key or key retriever");
}
try {
footerKey = keyRetriever.getKey(footerKeyMetaData);
} catch (KeyAccessDeniedException e) {
- throw new IOException("Footer key: access denied", e);
+ throw new KeyAccessDeniedException("Footer key: access denied", e);
}
if (null == footerKey) {
- throw new IOException("Footer key unavailable");
+ throw new ParquetCryptoRuntimeException("Footer key unavailable");
}
}
}
@@ -197,35 +197,35 @@ public class InternalFileDecryptor {
// re-use of the decryptor
// check the crypto metadata.
if (!this.algorithm.equals(algorithm)) {
- throw new IOException("Decryptor re-use: Different algorithm");
+ throw new ParquetCryptoRuntimeException("Decryptor re-use: Different algorithm");
}
if (encryptedFooter != this.encryptedFooter) {
- throw new IOException("Decryptor re-use: Different footer encryption");
+ throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer encryption");
}
if (!Arrays.equals(this.footerKeyMetaData, footerKeyMetaData)) {
- throw new IOException("Decryptor re-use: Different footer key metadata ");
+ throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer key metadata");
}
}
}
public InternalColumnDecryptionSetup setColumnCryptoMetadata(ColumnPath path, boolean encrypted,
- boolean encryptedWithFooterKey, byte[] keyMetadata, short columnOrdinal) throws IOException {
+ boolean encryptedWithFooterKey, byte[] keyMetadata, int columnOrdinal) {
if (!fileCryptoMetaDataProcessed) {
- throw new IOException("Haven't parsed the file crypto metadata yet");
+ throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
}
InternalColumnDecryptionSetup columnDecryptionSetup = columnMap.get(path);
if (null != columnDecryptionSetup) {
if (columnDecryptionSetup.isEncrypted() != encrypted) {
- throw new IOException("Re-use: wrong encrypted flag. Column: " + path);
+ throw new ParquetCryptoRuntimeException("Re-use: wrong encrypted flag. Column: " + path);
}
if (encrypted) {
if (encryptedWithFooterKey != columnDecryptionSetup.isEncryptedWithFooterKey()) {
- throw new IOException("Re-use: wrong encryption key (column vs footer). Column: " + path);
+ throw new ParquetCryptoRuntimeException("Re-use: wrong encryption key (column vs footer). Column: " + path);
}
if (!encryptedWithFooterKey && !Arrays.equals(columnDecryptionSetup.getKeyMetadata(), keyMetadata)) {
- throw new IOException("Decryptor re-use: Different footer key metadata ");
+ throw new ParquetCryptoRuntimeException("Decryptor re-use: Different footer key metadata ");
}
}
return columnDecryptionSetup;
@@ -236,7 +236,7 @@ public class InternalFileDecryptor {
} else {
if (encryptedWithFooterKey) {
if (null == footerKey) {
- throw new IOException("Column " + path + " is encrypted with NULL footer key");
+ throw new ParquetCryptoRuntimeException("Column " + path + " is encrypted with NULL footer key");
}
columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, true,
getDataModuleDecryptor(null), getThriftModuleDecryptor(null), columnOrdinal, null);
@@ -248,12 +248,12 @@ public class InternalFileDecryptor {
try {
columnKeyBytes = keyRetriever.getKey(keyMetadata);
} catch (KeyAccessDeniedException e) {
- throw new IOException("Column " + path + ": key access denied", e);
+ throw new KeyAccessDeniedException("Column " + path + ": key access denied", e);
}
}
if (null == columnKeyBytes) { // Hidden column: encrypted, but key unavailable
- throw new IOException("Column " + path + ": key unavailable");
+ throw new ParquetCryptoRuntimeException("Column " + path + ": key unavailable");
} else { // Key is available
columnDecryptionSetup = new InternalColumnDecryptionSetup(path, true, false,
getDataModuleDecryptor(columnKeyBytes), getThriftModuleDecryptor(columnKeyBytes), columnOrdinal, keyMetadata);
@@ -269,12 +269,12 @@ public class InternalFileDecryptor {
return this.fileAAD;
}
- public AesGcmEncryptor getSignedFooterEncryptor() throws IOException {
+ public AesGcmEncryptor createSignedFooterEncryptor() {
if (!fileCryptoMetaDataProcessed) {
- throw new IOException("Haven't parsed the file crypto metadata yet");
+ throw new ParquetCryptoRuntimeException("Haven't parsed the file crypto metadata yet");
}
if (encryptedFooter) {
- throw new IOException("Requesting signed footer encryptor in file with encrypted footer");
+ throw new ParquetCryptoRuntimeException("Requesting signed footer encryptor in file with encrypted footer");
}
return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
@@ -295,5 +295,9 @@ public class InternalFileDecryptor {
public boolean plaintextFile() {
return plaintextFile;
}
+
+ public FileDecryptionProperties getDecryptionProperties() {
+ return fileDecryptionProperties;
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
index 9d408ef..c167a5e 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/InternalFileEncryptor.java
@@ -24,7 +24,6 @@ import org.apache.parquet.format.FileCryptoMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.EncryptionAlgorithm;
-import java.io.IOException;
import java.util.HashMap;
public class InternalFileEncryptor {
@@ -41,7 +40,7 @@ public class InternalFileEncryptor {
private BlockCipher.Encryptor aesCtrEncryptorWithFooterKey;
private boolean fileCryptoMetaDataCreated;
- public InternalFileEncryptor(FileEncryptionProperties fileEncryptionProperties) throws IOException {
+ public InternalFileEncryptor(FileEncryptionProperties fileEncryptionProperties) {
this.fileEncryptionProperties = fileEncryptionProperties;
algorithm = fileEncryptionProperties.getAlgorithm();
footerKey = fileEncryptionProperties.getFooterKey();
@@ -52,7 +51,7 @@ public class InternalFileEncryptor {
fileCryptoMetaDataCreated = false;
}
- private BlockCipher.Encryptor getThriftModuleEncryptor(byte[] columnKey) throws IOException {
+ private BlockCipher.Encryptor getThriftModuleEncryptor(byte[] columnKey) {
if (null == columnKey) { // Encryptor with footer key
if (null == aesGcmEncryptorWithFooterKey) {
aesGcmEncryptorWithFooterKey = ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
@@ -63,7 +62,7 @@ public class InternalFileEncryptor {
}
}
- private BlockCipher.Encryptor getDataModuleEncryptor(byte[] columnKey) throws IOException {
+ private BlockCipher.Encryptor getDataModuleEncryptor(byte[] columnKey) {
if (algorithm.isSetAES_GCM_V1()) {
return getThriftModuleEncryptor(columnKey);
}
@@ -79,27 +78,27 @@ public class InternalFileEncryptor {
}
public InternalColumnEncryptionSetup getColumnSetup(ColumnPath columnPath,
- boolean createIfNull, short ordinal) throws IOException {
+ boolean createIfNull, int ordinal) {
InternalColumnEncryptionSetup internalColumnProperties = columnMap.get(columnPath);
if (null != internalColumnProperties) {
if (ordinal != internalColumnProperties.getOrdinal()) {
- throw new IOException("Column ordinal doesnt match " + columnPath +
+ throw new ParquetCryptoRuntimeException("Column ordinal doesnt match " + columnPath +
": " + ordinal + ", "+internalColumnProperties.getOrdinal());
}
return internalColumnProperties;
}
if (!createIfNull) {
- throw new IOException("No encryption setup found for column " + columnPath);
+ throw new ParquetCryptoRuntimeException("No encryption setup found for column " + columnPath);
}
if (fileCryptoMetaDataCreated) {
- throw new IOException("Re-use: No encryption setup for column " + columnPath);
+ throw new ParquetCryptoRuntimeException("Re-use: No encryption setup for column " + columnPath);
}
ColumnEncryptionProperties columnProperties = fileEncryptionProperties.getColumnProperties(columnPath);
if (null == columnProperties) {
- throw new IOException("No encryption properties for column " + columnPath);
+ throw new ParquetCryptoRuntimeException("No encryption properties for column " + columnPath);
}
if (columnProperties.isEncrypted()) {
if (columnProperties.isEncryptedWithFooterKey()) {
@@ -118,14 +117,14 @@ public class InternalFileEncryptor {
return internalColumnProperties;
}
- public BlockCipher.Encryptor getFooterEncryptor() throws IOException {
+ public BlockCipher.Encryptor getFooterEncryptor() {
if (!encryptFooter) return null;
return getThriftModuleEncryptor(null);
}
- public FileCryptoMetaData getFileCryptoMetaData() throws IOException {
+ public FileCryptoMetaData getFileCryptoMetaData() {
if (!encryptFooter) {
- throw new IOException("Requesting FileCryptoMetaData in file with unencrypted footer");
+ throw new ParquetCryptoRuntimeException("Requesting FileCryptoMetaData in file with unencrypted footer");
}
FileCryptoMetaData fileCryptoMetaData = new FileCryptoMetaData(algorithm);
if (null != footerKeyMetadata) {
@@ -159,16 +158,16 @@ public class InternalFileEncryptor {
return this.fileAAD;
}
- public byte[] getFooterSigningKeyMetaData() throws IOException {
+ public byte[] getFooterSigningKeyMetaData() {
if (encryptFooter) {
- throw new IOException("Requesting signing footer key metadata in file with encrypted footer");
+ throw new ParquetCryptoRuntimeException("Requesting signing footer key metadata in file with encrypted footer");
}
return footerKeyMetadata;
}
- public AesGcmEncryptor getSignedFooterEncryptor() throws IOException {
+ public AesGcmEncryptor getSignedFooterEncryptor() {
if (encryptFooter) {
- throw new IOException("Requesting signed footer encryptor in file with encrypted footer");
+ throw new ParquetCryptoRuntimeException("Requesting signed footer encryptor in file with encrypted footer");
}
return (AesGcmEncryptor) ModuleCipherFactory.getEncryptor(AesMode.GCM, footerKey);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
index 0d9d84f..bff495a 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/KeyAccessDeniedException.java
@@ -19,12 +19,21 @@
package org.apache.parquet.crypto;
-import java.security.KeyException;
-public class KeyAccessDeniedException extends KeyException {
+public class KeyAccessDeniedException extends ParquetCryptoRuntimeException {
private static final long serialVersionUID = 1L;
- public KeyAccessDeniedException(String keyID) {
- super(keyID);
+ public KeyAccessDeniedException() {}
+
+ public KeyAccessDeniedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public KeyAccessDeniedException(String message) {
+ super(message);
+ }
+
+ public KeyAccessDeniedException(Throwable cause) {
+ super(cause);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
index bbde66e..1c25313 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ModuleCipherFactory.java
@@ -21,8 +21,6 @@ package org.apache.parquet.crypto;
import org.apache.parquet.format.BlockCipher;
-import java.io.IOException;
-
public class ModuleCipherFactory {
// Parquet Module types
@@ -34,7 +32,9 @@ public class ModuleCipherFactory {
DataPageHeader((byte)4),
DictionaryPageHeader((byte)5),
ColumnIndex((byte)6),
- OffsetIndex((byte)7);
+ OffsetIndex((byte)7),
+ BloomFilterHeader((byte)8),
+ BloomFilterBitset((byte)9);
private final byte value;
@@ -49,8 +49,7 @@ public class ModuleCipherFactory {
public static final int SIZE_LENGTH = 4;
- public static BlockCipher.Encryptor getEncryptor(AesMode mode, byte[] keyBytes)
- throws IllegalArgumentException, IOException {
+ public static BlockCipher.Encryptor getEncryptor(AesMode mode, byte[] keyBytes) {
switch (mode) {
case GCM:
return new AesGcmEncryptor(keyBytes);
@@ -61,8 +60,7 @@ public class ModuleCipherFactory {
}
}
- public static BlockCipher.Decryptor getDecryptor(AesMode mode, byte[] keyBytes)
- throws IllegalArgumentException, IOException {
+ public static BlockCipher.Decryptor getDecryptor(AesMode mode, byte[] keyBytes) {
switch (mode) {
case GCM:
return new AesGcmDecryptor(keyBytes);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java
index 60255b5..cb9489c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ParquetCryptoRuntimeException.java
@@ -21,7 +21,7 @@ package org.apache.parquet.crypto;
import org.apache.parquet.ParquetRuntimeException;
/**
- * Thrown when an encryption or decryption operation problem is occurred
+ * Thrown upon encryption or decryption operation problem
*/
public class ParquetCryptoRuntimeException extends ParquetRuntimeException {
private static final long serialVersionUID = 1L;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java
deleted file mode 100755
index 7b15089..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/StringKeyIdRetriever.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.crypto;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Hashtable;
-
-// Simple key retriever, based on UTF8 strings as key identifiers
-public class StringKeyIdRetriever implements DecryptionKeyRetriever{
-
- private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
-
- public void putKey(String keyId, byte[] keyBytes) {
- keyMap.put(keyId, keyBytes);
- }
-
- @Override
- public byte[] getKey(byte[] keyMetaData) {
- String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
- return keyMap.get(keyId);
- }
-}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/TagVerificationException.java
old mode 100755
new mode 100644
similarity index 69%
rename from parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java
rename to parquet-hadoop/src/main/java/org/apache/parquet/crypto/TagVerificationException.java
index e4c3dca..5fc6acd
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/HiddenColumnException.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/TagVerificationException.java
@@ -19,20 +19,21 @@
package org.apache.parquet.crypto;
-import org.apache.parquet.ParquetRuntimeException;
-/**
- * Reader doesn't have key for encrypted column,
- * but tries to access its contents
- */
-public class HiddenColumnException extends ParquetRuntimeException {
+public class TagVerificationException extends ParquetCryptoRuntimeException {
private static final long serialVersionUID = 1L;
- public HiddenColumnException(String string) {
- super(string);
+ public TagVerificationException() {}
+
+ public TagVerificationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TagVerificationException(String message) {
+ super(message);
}
- public HiddenColumnException(String string, Exception e) {
- super(string, e);
+ public TagVerificationException(Throwable cause) {
+ super(cause);
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3908463..2c93d31 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -22,8 +22,10 @@ import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.parquet.format.Util.readFileMetaData;
+import static org.apache.parquet.format.Util.writeColumnMetaData;
import static org.apache.parquet.format.Util.writePageHeader;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -43,9 +45,19 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.AesGcmEncryptor;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.crypto.TagVerificationException;
+import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.BloomFilterAlgorithm;
import org.apache.parquet.format.BloomFilterCompression;
import org.apache.parquet.format.BloomFilterHash;
@@ -75,6 +87,7 @@ import org.apache.parquet.format.XxHash;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.BoundaryOrder;
import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnCryptoMetaData;
import org.apache.parquet.format.ColumnIndex;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.ColumnOrder;
@@ -83,6 +96,7 @@ import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.EncryptionWithColumnKey;
import org.apache.parquet.format.FieldRepetitionType;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.KeyValue;
@@ -176,12 +190,17 @@ public class ParquetMetadataConverter {
cachedEncodingSets = new ConcurrentHashMap<Set<org.apache.parquet.column.Encoding>, Set<org.apache.parquet.column.Encoding>>();
public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
+ return toParquetMetadata(currentVersion, parquetMetadata, null);
+ }
+
+ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata,
+ InternalFileEncryptor fileEncryptor) {
List<BlockMetaData> blocks = parquetMetadata.getBlocks();
List<RowGroup> rowGroups = new ArrayList<RowGroup>();
long numRows = 0;
for (BlockMetaData block : blocks) {
numRows += block.getRowCount();
- addRowGroup(parquetMetadata, rowGroups, block);
+ addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor);
}
FileMetaData fileMetaData = new FileMetaData(
currentVersion,
@@ -463,14 +482,29 @@ public class ParquetMetadataConverter {
}
}
- private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
+ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block,
+ InternalFileEncryptor fileEncryptor) {
+
//rowGroup.total_byte_size = ;
List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
+ int rowGroupOrdinal = rowGroups.size();
+ int columnOrdinal = -1;
+ ByteArrayOutputStream tempOutStream = null;
for (ColumnChunkMetaData columnMetaData : columns) {
ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset
columnChunk.file_path = block.getPath(); // they are in the same file for now
- columnChunk.meta_data = new ColumnMetaData(
+ InternalColumnEncryptionSetup columnSetup = null;
+ boolean writeCryptoMetadata = false;
+ boolean encryptMetaData = false;
+ ColumnPath path = columnMetaData.getPath();
+ if (null != fileEncryptor) {
+ columnOrdinal++;
+ columnSetup = fileEncryptor.getColumnSetup(path, false, columnOrdinal);
+ writeCryptoMetadata = columnSetup.isEncrypted();
+ encryptMetaData = fileEncryptor.encryptColumnMetaData(columnSetup);
+ }
+ ColumnMetaData metaData = new ColumnMetaData(
getType(columnMetaData.getType()),
toFormatEncodings(columnMetaData.getEncodings()),
Arrays.asList(columnMetaData.getPath().toArray()),
@@ -479,14 +513,45 @@ public class ParquetMetadataConverter {
columnMetaData.getTotalUncompressedSize(),
columnMetaData.getTotalSize(),
columnMetaData.getFirstDataPageOffset());
- columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
- columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
+ metaData.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+ metaData.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
if (!columnMetaData.getStatistics().isEmpty()) {
- columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
+ metaData.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
}
if (columnMetaData.getEncodingStats() != null) {
- columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
+ metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
}
+
+ if (!encryptMetaData) {
+ columnChunk.setMeta_data(metaData);
+ } else {
+ // Serialize and encrypt ColumnMetadata separately
+ byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(),
+ ModuleType.ColumnMetaData, rowGroupOrdinal, columnSetup.getOrdinal(), -1);
+ if (null == tempOutStream) {
+ tempOutStream = new ByteArrayOutputStream();
+ } else {
+ tempOutStream.reset();
+ }
+ try {
+ writeColumnMetaData(metaData, tempOutStream, columnSetup.getMetaDataEncryptor(), columnMetaDataAAD);
+ } catch (IOException e) {
+ throw new ParquetCryptoRuntimeException("Failed to serialize and encrypt ColumnMetadata for " +
+ columnMetaData.getPath(), e);
+ }
+ columnChunk.setEncrypted_column_metadata(tempOutStream.toByteArray());
+ // Keep redacted metadata version for old readers
+ if (!fileEncryptor.isFooterEncrypted()) {
+ ColumnMetaData metaDataRedacted = metaData.deepCopy();
+ if (metaDataRedacted.isSetStatistics()) metaDataRedacted.unsetStatistics();
+ if (metaDataRedacted.isSetEncoding_stats()) metaDataRedacted.unsetEncoding_stats();
+ columnChunk.setMeta_data(metaDataRedacted);
+ }
+ }
+ if (writeCryptoMetadata) {
+ columnChunk.setCrypto_metadata(columnSetup.getColumnCryptoMetaData());
+ }
+
// columnChunk.meta_data.index_page_offset = ;
// columnChunk.meta_data.key_value_metadata = ; // nothing yet
@@ -504,6 +569,9 @@ public class ParquetMetadataConverter {
parquetColumns.add(columnChunk);
}
RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
+ rowGroup.setFile_offset(block.getStartingPos());
+ rowGroup.setTotal_compressed_size(block.getCompressedSize());
+ rowGroup.setOrdinal((short) rowGroupOrdinal);
rowGroups.add(rowGroup);
}
@@ -1145,15 +1213,28 @@ public class ParquetMetadataConverter {
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
- long startIndex = getOffset(rowGroup.getColumns().get(0));
- for (ColumnChunk col : rowGroup.getColumns()) {
- totalSize += col.getMeta_data().getTotal_compressed_size();
+ long startIndex;
+
+ if (rowGroup.isSetFile_offset()) {
+ startIndex = rowGroup.getFile_offset();
+ } else {
+ startIndex = getOffset(rowGroup.getColumns().get(0));
}
+
+ if (rowGroup.isSetTotal_compressed_size()) {
+ totalSize = rowGroup.getTotal_compressed_size();
+ } else {
+ for (ColumnChunk col : rowGroup.getColumns()) {
+ totalSize += col.getMeta_data().getTotal_compressed_size();
+ }
+ }
+
long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
}
+
metaData.setRow_groups(newRowGroups);
return metaData;
}
@@ -1163,7 +1244,13 @@ public class ParquetMetadataConverter {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
for (RowGroup rowGroup : rowGroups) {
- long startIndex = getOffset(rowGroup.getColumns().get(0));
+ long startIndex;
+ if (rowGroup.isSetFile_offset()) {
+ startIndex = rowGroup.getFile_offset();
+ } else {
+ startIndex = getOffset(rowGroup.getColumns().get(0));
+ }
+
if (filter.contains(startIndex)) {
newRowGroups.add(rowGroup);
}
@@ -1173,8 +1260,12 @@ public class ParquetMetadataConverter {
}
static long getOffset(RowGroup rowGroup) {
+ if (rowGroup.isSetFile_offset()) {
+ return rowGroup.getFile_offset();
+ }
return getOffset(rowGroup.getColumns().get(0));
}
+
// Visible for testing
static long getOffset(ColumnChunk columnChunk) {
ColumnMetaData md = columnChunk.getMeta_data();
@@ -1185,70 +1276,186 @@ public class ParquetMetadataConverter {
return offset;
}
+ private static void verifyFooterIntegrity(InputStream from, InternalFileDecryptor fileDecryptor,
+ int combinedFooterLength) throws IOException {
+
+ byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
+ from.read(nonce);
+ byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
+ from.read(gcmTag);
+
+ AesGcmEncryptor footerSigner = fileDecryptor.createSignedFooterEncryptor();
+
+ byte[] footerAndSignature = ((ByteBufferInputStream) from).slice(0).array();
+ int footerSignatureLength = AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH;
+ byte[] serializedFooter = new byte[combinedFooterLength - footerSignatureLength];
+ System.arraycopy(footerAndSignature, 0, serializedFooter, 0, serializedFooter.length);
+
+ byte[] signedFooterAAD = AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
+ byte[] encryptedFooterBytes = footerSigner.encrypt(false, serializedFooter, nonce, signedFooterAAD);
+ byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH];
+ System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - AesCipher.GCM_TAG_LENGTH,
+ calculatedTag, 0, AesCipher.GCM_TAG_LENGTH);
+ if (!Arrays.equals(gcmTag, calculatedTag)) {
+ throw new TagVerificationException("Signature mismatch in plaintext footer");
+ }
+ }
+
public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
+ return readParquetMetadata(from, filter, null, false, 0);
+ }
+
+ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter,
+ final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter,
+ final int combinedFooterLength) throws IOException {
+
+ final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null);
+ final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
+
FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
@Override
public FileMetaData visit(NoFilter filter) throws IOException {
- return readFileMetaData(from);
+ return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
}
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
- return readFileMetaData(from, true);
+ return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
}
@Override
public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
- return filterFileMetaDataByStart(readFileMetaData(from), filter);
+ return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
}
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
- return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
+ return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter);
}
});
LOG.debug("{}", fileMetaData);
- ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
+
+ if (!encryptedFooter && null != fileDecryptor) {
+ if (!fileMetaData.isSetEncryption_algorithm()) { // Plaintext file
+ fileDecryptor.setPlaintextFile();
+ // Done to detect files that were not encrypted by mistake
+ if (!fileDecryptor.plaintextFilesAllowed()) {
+ throw new ParquetCryptoRuntimeException("Applying decryptor on plaintext file");
+ }
+ } else { // Encrypted file with plaintext footer
+ // if no fileDecryptor, can still read plaintext columns
+ fileDecryptor.setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), false,
+ fileMetaData.getFooter_signing_key_metadata());
+ if (fileDecryptor.checkFooterIntegrity()) {
+ verifyFooterIntegrity(from, fileDecryptor, combinedFooterLength);
+ }
+ }
+ }
+
+ ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter);
if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
return parquetMetadata;
}
+
+ public ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) {
+ return ColumnChunkMetaData.get(
+ columnPath,
+ type,
+ fromFormatCodec(metaData.codec),
+ convertEncodingStats(metaData.getEncoding_stats()),
+ fromFormatEncodings(metaData.encodings),
+ fromParquetStatistics(
+ createdBy,
+ metaData.statistics,
+ type),
+ metaData.data_page_offset,
+ metaData.dictionary_page_offset,
+ metaData.num_values,
+ metaData.total_compressed_size,
+ metaData.total_uncompressed_size);
+ }
public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
+ return fromParquetMetadata(parquetMetadata, null, false);
+ }
+
+ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
+ InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException {
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
+
if (row_groups != null) {
for (RowGroup rowGroup : row_groups) {
BlockMetaData blockMetaData = new BlockMetaData();
blockMetaData.setRowCount(rowGroup.getNum_rows());
blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+ // not set in legacy files
+ if (rowGroup.isSetOrdinal()) {
+ blockMetaData.setOrdinal(rowGroup.getOrdinal());
+ }
List<ColumnChunk> columns = rowGroup.getColumns();
String filePath = columns.get(0).getFile_path();
+ int columnOrdinal = -1;
for (ColumnChunk columnChunk : columns) {
+ columnOrdinal++;
if ((filePath == null && columnChunk.getFile_path() != null)
|| (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
}
ColumnMetaData metaData = columnChunk.meta_data;
- ColumnPath path = getPath(metaData);
- ColumnChunkMetaData column = ColumnChunkMetaData.get(
- path,
- messageType.getType(path.toArray()).asPrimitiveType(),
- fromFormatCodec(metaData.codec),
- convertEncodingStats(metaData.getEncoding_stats()),
- fromFormatEncodings(metaData.encodings),
- fromParquetStatistics(
- parquetMetadata.getCreated_by(),
- metaData.statistics,
- messageType.getType(path.toArray()).asPrimitiveType()),
- metaData.data_page_offset,
- metaData.dictionary_page_offset,
- metaData.num_values,
- metaData.total_compressed_size,
- metaData.total_uncompressed_size);
+ ColumnCryptoMetaData cryptoMetaData = columnChunk.getCrypto_metadata();
+ ColumnChunkMetaData column = null;
+ ColumnPath columnPath = null;
+ boolean encryptedMetadata = false;
+
+ if (null == cryptoMetaData) { // Plaintext column
+ columnPath = getPath(metaData);
+ if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+ // mark this column as plaintext in encrypted file decryptor
+ fileDecryptor.setColumnCryptoMetadata(columnPath, false, false, (byte[]) null, columnOrdinal);
+ }
+ } else { // Encrypted column
+ boolean encryptedWithFooterKey = cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
+ if (encryptedWithFooterKey) { // Column encrypted with footer key
+ if (!encryptedFooter) {
+ throw new ParquetCryptoRuntimeException("Column encrypted with footer key in file with plaintext footer");
+ }
+ if (null == metaData) {
+ throw new ParquetCryptoRuntimeException("ColumnMetaData not set in Encryption with Footer key");
+ }
+ if (null == fileDecryptor) {
+ throw new ParquetCryptoRuntimeException("Column encrypted with footer key: No keys available");
+ }
+ columnPath = getPath(metaData);
+ fileDecryptor.setColumnCryptoMetadata(columnPath, true, true, (byte[]) null, columnOrdinal);
+ } else { // Column encrypted with column key
+ // setColumnCryptoMetadata triggers KMS interaction, hence delayed until this column is projected
+ encryptedMetadata = true;
+ }
+ }
+
+ String createdBy = parquetMetadata.getCreated_by();
+ if (!encryptedMetadata) { // unencrypted column, or encrypted with footer key
+ column = buildColumnChunkMetaData(metaData, columnPath,
+ messageType.getType(columnPath.toArray()).asPrimitiveType(), createdBy);
+ column.setRowGroupOrdinal(rowGroup.getOrdinal());
+ column.setBloomFilterOffset(metaData.bloom_filter_offset);
+ } else { // column encrypted with column key
+ // Metadata will be decrypted later, if this column is accessed
+ EncryptionWithColumnKey columnKeyStruct = cryptoMetaData.getENCRYPTION_WITH_COLUMN_KEY();
+ List<String> pathList = columnKeyStruct.getPath_in_schema();
+ byte[] columnKeyMetadata = columnKeyStruct.getKey_metadata();
+ columnPath = ColumnPath.get(pathList.toArray(new String[pathList.size()]));
+ byte[] encryptedMetadataBuffer = columnChunk.getEncrypted_column_metadata();
+ column = ColumnChunkMetaData.getWithEncryptedMetadata(this, columnPath,
+ messageType.getType(columnPath.toArray()).asPrimitiveType(), encryptedMetadataBuffer,
+ columnKeyMetadata, fileDecryptor, rowGroup.getOrdinal(), columnOrdinal, createdBy);
+ }
+
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
- column.setBloomFilterOffset(metaData.bloom_filter_offset);
+
// TODO
// index_page_offset
// key_value_metadata
@@ -1266,7 +1473,7 @@ public class ParquetMetadataConverter {
}
}
return new ParquetMetadata(
- new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by()),
+ new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by(), fileDecryptor),
blocks);
}
@@ -1467,19 +1674,34 @@ public class ParquetMetadataConverter {
}
public void writeDataPageV1Header(
- int uncompressedSize,
- int compressedSize,
- int valueCount,
- org.apache.parquet.column.Encoding rlEncoding,
- org.apache.parquet.column.Encoding dlEncoding,
- org.apache.parquet.column.Encoding valuesEncoding,
- OutputStream to) throws IOException {
+ int uncompressedSize,
+ int compressedSize,
+ int valueCount,
+ org.apache.parquet.column.Encoding rlEncoding,
+ org.apache.parquet.column.Encoding dlEncoding,
+ org.apache.parquet.column.Encoding valuesEncoding,
+ OutputStream to) throws IOException {
+ writeDataPageV1Header(uncompressedSize, compressedSize, valueCount,
+ rlEncoding, dlEncoding, valuesEncoding, to, null, null);
+ }
+
+ public void writeDataPageV1Header(
+ int uncompressedSize,
+ int compressedSize,
+ int valueCount,
+ org.apache.parquet.column.Encoding rlEncoding,
+ org.apache.parquet.column.Encoding dlEncoding,
+ org.apache.parquet.column.Encoding valuesEncoding,
+ OutputStream to,
+ BlockCipher.Encryptor blockEncryptor,
+ byte[] AAD) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
- compressedSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding), to);
+ compressedSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding),
+ to, blockEncryptor, AAD);
}
public void writeDataPageV1Header(
@@ -1491,13 +1713,29 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding valuesEncoding,
int crc,
OutputStream to) throws IOException {
+ writeDataPageV1Header(uncompressedSize, compressedSize, valueCount,
+ rlEncoding, dlEncoding, valuesEncoding, crc, to, null, null);
+ }
+
+ public void writeDataPageV1Header(
+ int uncompressedSize,
+ int compressedSize,
+ int valueCount,
+ org.apache.parquet.column.Encoding rlEncoding,
+ org.apache.parquet.column.Encoding dlEncoding,
+ org.apache.parquet.column.Encoding valuesEncoding,
+ int crc,
+ OutputStream to,
+ BlockCipher.Encryptor blockEncryptor,
+ byte[] AAD) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
- crc), to);
+ crc),
+ to, blockEncryptor, AAD);
}
public void writeDataPageV2Header(
@@ -1506,12 +1744,24 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength, int dlByteLength,
OutputStream to) throws IOException {
+ writeDataPageV2Header(uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount, dataEncoding,
+ rlByteLength, dlByteLength, to, null, null);
+ }
+
+ public void writeDataPageV2Header(
+ int uncompressedSize, int compressedSize,
+ int valueCount, int nullCount, int rowCount,
+ org.apache.parquet.column.Encoding dataEncoding,
+ int rlByteLength, int dlByteLength,
+ OutputStream to, BlockCipher.Encryptor blockEncryptor,
+ byte[] AAD) throws IOException {
writePageHeader(
newDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
- rlByteLength, dlByteLength), to);
+ rlByteLength, dlByteLength), to, blockEncryptor, AAD);
}
private PageHeader newDataPageV2Header(
@@ -1530,20 +1780,36 @@ public class ParquetMetadataConverter {
}
public void writeDictionaryPageHeader(
- int uncompressedSize, int compressedSize, int valueCount,
- org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+ int uncompressedSize, int compressedSize, int valueCount,
+ org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+ writeDictionaryPageHeader(uncompressedSize, compressedSize, valueCount,
+ valuesEncoding, to, null, null);
+ }
+
+ public void writeDictionaryPageHeader(
+ int uncompressedSize, int compressedSize, int valueCount,
+ org.apache.parquet.column.Encoding valuesEncoding, OutputStream to,
+ BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
- writePageHeader(pageHeader, to);
+ writePageHeader(pageHeader, to, blockEncryptor, AAD);
}
public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to) throws IOException {
+ writeDictionaryPageHeader(uncompressedSize, compressedSize, valueCount,
+ valuesEncoding, crc, to, null, null);
+ }
+
+ public void writeDictionaryPageHeader(
+ int uncompressedSize, int compressedSize, int valueCount,
+ org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to,
+ BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setCrc(crc);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
- writePageHeader(pageHeader, to);
+ writePageHeader(pageHeader, to, blockEncryptor, AAD);
}
private static BoundaryOrder toParquetBoundaryOrder(
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 6f21fa3..3d1bafe 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -37,6 +37,9 @@ import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.io.ParquetDecodingException;
@@ -69,9 +72,15 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
private final OffsetIndex offsetIndex;
private final long rowCount;
private int pageIndex = 0;
+
+ private final BlockCipher.Decryptor blockDecryptor;
+ private final byte[] dataPageAAD;
+ private final byte[] dictionaryPageAAD;
ColumnChunkPageReader(BytesInputDecompressor decompressor, List<DataPage> compressedPages,
- DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount) {
+ DictionaryPage compressedDictionaryPage, OffsetIndex offsetIndex, long rowCount,
+ BlockCipher.Decryptor blockDecryptor, byte[] fileAAD,
+ int rowGroupOrdinal, int columnOrdinal) {
this.decompressor = decompressor;
this.compressedPages = new ArrayDeque<DataPage>(compressedPages);
this.compressedDictionaryPage = compressedDictionaryPage;
@@ -82,6 +91,24 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
this.valueCount = count;
this.offsetIndex = offsetIndex;
this.rowCount = rowCount;
+
+ this.blockDecryptor = blockDecryptor;
+
+ if (null != blockDecryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage, rowGroupOrdinal, columnOrdinal, 0);
+ dictionaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage, rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ dataPageAAD = null;
+ dictionaryPageAAD = null;
+ }
+ }
+
+ private int getPageOrdinal(int currentPageIndex) {
+ if (null == offsetIndex) {
+ return currentPageIndex;
+ }
+
+ return offsetIndex.getPageOrdinal(currentPageIndex);
}
@Override
@@ -96,11 +123,21 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
return null;
}
final int currentPageIndex = pageIndex++;
+
+ if (null != blockDecryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD, getPageOrdinal(currentPageIndex));
+ }
+
return compressedPage.accept(new DataPage.Visitor<DataPage>() {
@Override
public DataPage visit(DataPageV1 dataPageV1) {
try {
- BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize());
+ BytesInput bytes = dataPageV1.getBytes();
+ if (null != blockDecryptor) {
+ bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+ }
+ BytesInput decompressed = decompressor.decompress(bytes, dataPageV1.getUncompressedSize());
+
final DataPageV1 decompressedPage;
if (offsetIndex == null) {
decompressedPage = new DataPageV1(
@@ -135,54 +172,53 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
@Override
public DataPage visit(DataPageV2 dataPageV2) {
- if (!dataPageV2.isCompressed()) {
- if (offsetIndex == null) {
- return dataPageV2;
- } else {
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- offsetIndex.getFirstRowIndex(currentPageIndex),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- dataPageV2.getData(),
- dataPageV2.getStatistics());
+ if (!dataPageV2.isCompressed() && offsetIndex == null && null == blockDecryptor) {
+ return dataPageV2;
+ }
+ BytesInput pageBytes = dataPageV2.getData();
+
+ if (null != blockDecryptor) {
+ try {
+ pageBytes = BytesInput.from(blockDecryptor.decrypt(pageBytes.toByteArray(), dataPageAAD));
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not convert page ByteInput to byte array", e);
}
}
- try {
+ if (dataPageV2.isCompressed()) {
int uncompressedSize = Math.toIntExact(
dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
- BytesInput decompressed = decompressor.decompress(dataPageV2.getData(), uncompressedSize);
- if (offsetIndex == null) {
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- decompressed,
- dataPageV2.getStatistics());
- } else {
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- offsetIndex.getFirstRowIndex(currentPageIndex),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- decompressed,
- dataPageV2.getStatistics());
+ try {
+ pageBytes = decompressor.decompress(pageBytes, uncompressedSize);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("could not decompress page", e);
}
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decompress page", e);
}
- }
+
+ if (offsetIndex == null) {
+ return DataPageV2.uncompressed(
+ dataPageV2.getRowCount(),
+ dataPageV2.getNullCount(),
+ dataPageV2.getValueCount(),
+ dataPageV2.getRepetitionLevels(),
+ dataPageV2.getDefinitionLevels(),
+ dataPageV2.getDataEncoding(),
+ pageBytes,
+ dataPageV2.getStatistics());
+ } else {
+ return DataPageV2.uncompressed(
+ dataPageV2.getRowCount(),
+ dataPageV2.getNullCount(),
+ dataPageV2.getValueCount(),
+ offsetIndex.getFirstRowIndex(currentPageIndex),
+ dataPageV2.getRepetitionLevels(),
+ dataPageV2.getDefinitionLevels(),
+ dataPageV2.getDataEncoding(),
+ pageBytes,
+ dataPageV2.getStatistics());
+ }
+ }
});
}
@@ -192,8 +228,12 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
return null;
}
try {
+ BytesInput bytes = compressedDictionaryPage.getBytes();
+ if (null != blockDecryptor) {
+ bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dictionaryPageAAD));
+ }
DictionaryPage decompressedPage = new DictionaryPage(
- decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
+ decompressor.decompress(bytes, compressedDictionaryPage.getUncompressedSize()),
compressedDictionaryPage.getDictionarySize(),
compressedDictionaryPage.getEncoding());
if (compressedDictionaryPage.getCrc().isPresent()) {
@@ -249,5 +289,4 @@ class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore
throw new RuntimeException(path+ " was added twice");
}
}
-
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index d2e4c96..134f3da 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -40,8 +40,14 @@ import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.io.ParquetEncodingException;
@@ -82,12 +88,26 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
private final CRC32 crc;
boolean pageWriteChecksumEnabled;
+
+ private final BlockCipher.Encryptor headerBlockEncryptor;
+ private final BlockCipher.Encryptor pageBlockEncryptor;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+ private int pageOrdinal;
+ private final byte[] dataPageAAD;
+ private final byte[] dataPageHeaderAAD;
+ private final byte[] fileAAD;
private ColumnChunkPageWriter(ColumnDescriptor path,
BytesCompressor compressor,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
- boolean pageWriteChecksumEnabled) {
+ boolean pageWriteChecksumEnabled,
+ BlockCipher.Encryptor headerBlockEncryptor,
+ BlockCipher.Encryptor pageBlockEncryptor,
+ byte[] fileAAD,
+ int rowGroupOrdinal,
+ int columnOrdinal) {
this.path = path;
this.compressor = compressor;
this.allocator = allocator;
@@ -96,6 +116,25 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+ this.headerBlockEncryptor = headerBlockEncryptor;
+ this.pageBlockEncryptor = pageBlockEncryptor;
+ this.fileAAD = fileAAD;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.pageOrdinal = -1;
+ if (null != headerBlockEncryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+ rowGroupOrdinal, columnOrdinal, 0);
+ } else {
+ dataPageHeaderAAD = null;
+ }
+ if (null != pageBlockEncryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+ rowGroupOrdinal, columnOrdinal, 0);
+ } else {
+ dataPageAAD = null;
+ }
}
@Override
@@ -117,6 +156,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
+ pageOrdinal++;
long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
@@ -124,6 +164,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
uncompressedSize);
}
BytesInput compressedBytes = compressor.compress(bytes);
+ if (null != pageBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+ compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
+ }
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
@@ -131,6 +175,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
+ compressedSize);
}
tempOutputStream.reset();
+ if (null != headerBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(compressedBytes.toByteArray());
@@ -142,7 +189,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
dlEncoding,
valuesEncoding,
(int) crc.getValue(),
- tempOutputStream);
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
} else {
parquetMetadataConverter.writeDataPageV1Header(
(int)uncompressedSize,
@@ -151,7 +200,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
rlEncoding,
dlEncoding,
valuesEncoding,
- tempOutputStream);
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
}
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
@@ -182,6 +233,8 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data,
Statistics<?> statistics) throws IOException {
+ pageOrdinal++;
+
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(
@@ -189,17 +242,26 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
);
// TODO: decide if we compress
BytesInput compressedData = compressor.compress(data);
+ if (null != pageBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+ compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
+ }
int compressedSize = toIntWithCheck(
compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
tempOutputStream.reset();
+ if (null != headerBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
- tempOutputStream);
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
@@ -242,21 +304,43 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
}
public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
- writer.writeColumnChunk(
- path,
- totalValueCount,
- compressor.getCodecName(),
- dictionaryPage,
- buf,
- uncompressedLength,
- compressedLength,
- totalStatistics,
- columnIndexBuilder,
- offsetIndexBuilder,
- bloomFilter,
- rlEncodings,
- dlEncodings,
- dataEncodings);
+ if (null == headerBlockEncryptor) {
+ writer.writeColumnChunk(
+ path,
+ totalValueCount,
+ compressor.getCodecName(),
+ dictionaryPage,
+ buf,
+ uncompressedLength,
+ compressedLength,
+ totalStatistics,
+ columnIndexBuilder,
+ offsetIndexBuilder,
+ bloomFilter,
+ rlEncodings,
+ dlEncodings,
+ dataEncodings);
+ } else {
+ writer.writeColumnChunk(
+ path,
+ totalValueCount,
+ compressor.getCodecName(),
+ dictionaryPage,
+ buf,
+ uncompressedLength,
+ compressedLength,
+ totalStatistics,
+ columnIndexBuilder,
+ offsetIndexBuilder,
+ bloomFilter,
+ rlEncodings,
+ dlEncodings,
+ dataEncodings,
+ headerBlockEncryptor,
+ rowGroupOrdinal,
+ columnOrdinal,
+ fileAAD);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug(
String.format(
@@ -271,6 +355,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
dlEncodings.clear();
dataEncodings.clear();
pageCount = 0;
+ pageOrdinal = -1;
}
@Override
@@ -286,7 +371,13 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
BytesInput dictionaryBytes = dictionaryPage.getBytes();
int uncompressedSize = (int)dictionaryBytes.size();
BytesInput compressedBytes = compressor.compress(dictionaryBytes);
- this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+ if (null != pageBlockEncryptor) {
+ byte[] dictonaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+ rowGroupOrdinal, columnOrdinal, -1);
+ compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD));
+ }
+ this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize,
+ dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
}
@Override
@@ -313,7 +404,39 @@ class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore
int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled));
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength,
+ pageWriteChecksumEnabled, null, null, null, -1, -1));
+ }
+ }
+
+ public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+ int columnIndexTruncateLength, boolean pageWriteChecksumEnabled, InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) {
+ this.schema = schema;
+ if (null == fileEncryptor) {
+ for (ColumnDescriptor path : schema.getColumns()) {
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength,
+ pageWriteChecksumEnabled, null, null, null, -1, -1));
+ }
+ return;
+ }
+
+ // Encrypted file
+ int columnOrdinal = -1;
+ byte[] fileAAD = fileEncryptor.getFileAAD();
+ for (ColumnDescriptor path : schema.getColumns()) {
+ columnOrdinal++;
+ BlockCipher.Encryptor headerBlockEncryptor = null;
+ BlockCipher.Encryptor pageBlockEncryptor = null;
+ ColumnPath columnPath = ColumnPath.get(path.getPath());
+
+ InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal);
+ if (columnSetup.isEncrypted()) {
+ headerBlockEncryptor = columnSetup.getMetaDataEncryptor();
+ pageBlockEncryptor = columnSetup.getDataEncryptor();
+ }
+
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled,
+ headerBlockEncryptor, pageBlockEncryptor, fileAAD, rowGroupOrdinal, columnOrdinal));
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
index 448515e..fbec3b2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java
@@ -69,6 +69,11 @@ class ColumnIndexFilterUtils {
this.offsetIndex = offsetIndex;
this.indexMap = indexMap;
}
+
+ @Override
+ public int getPageOrdinal(int pageIndex) {
+ return indexMap[pageIndex];
+ }
@Override
public int getPageCount() {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 99643ed..d90fda1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -20,8 +20,6 @@ package org.apache.parquet.hadoop;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -32,12 +30,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
-import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
-
/**
* A {@link DictionaryPageReadStore} implementation that reads dictionaries from
* an open {@link ParquetFileReader}.
@@ -101,7 +95,7 @@ class DictionaryPageReader implements DictionaryPageReadStore {
return dictionaryPageCache.computeIfAbsent(dotPath, key -> {
try {
final DictionaryPage dict =
- hasDictionaryPage(column) ? reader.readDictionary(column) : null;
+ column.hasDictionaryPage() ? reader.readDictionary(column) : null;
// Copy the dictionary to ensure it can be reused if it is returned
// more than once. This can happen when a DictionaryFilter has two or
@@ -118,15 +112,4 @@ class DictionaryPageReader implements DictionaryPageReadStore {
return new DictionaryPage(BytesInput.from(dict.getBytes().toByteArray()),
dict.getDictionarySize(), dict.getEncoding());
}
-
- private boolean hasDictionaryPage(ColumnChunkMetaData column) {
- EncodingStats stats = column.getEncodingStats();
- if (stats != null) {
- // ensure there is a dictionary page and that it is used to encode data pages
- return stats.hasDictionaryPages() && stats.hasDictionaryEncodedPages();
- }
-
- Set<Encoding> encodings = column.getEncodings();
- return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
- }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 205509c..0ecdabf 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -29,6 +29,7 @@ import java.util.Objects;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.crypto.InternalFileEncryptor;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -67,6 +68,9 @@ class InternalParquetRecordWriter<T> {
private ColumnChunkPageWriteStore pageStore;
private BloomFilterWriteStore bloomFilterWriteStore;
private RecordConsumer recordConsumer;
+
+ private InternalFileEncryptor fileEncryptor;
+ private int rowGroupOrdinal;
/**
* @param parquetFileWriter the file to write to
@@ -95,6 +99,8 @@ class InternalParquetRecordWriter<T> {
this.compressor = compressor;
this.validating = validating;
this.props = props;
+ this.fileEncryptor = parquetFileWriter.getEncryptor();
+ this.rowGroupOrdinal = 0;
initStore();
}
@@ -104,7 +110,8 @@ class InternalParquetRecordWriter<T> {
private void initStore() {
ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,
- schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
+ schema, props.getAllocator(), props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled(),
+ fileEncryptor, rowGroupOrdinal);
pageStore = columnChunkPageWriteStore;
bloomFilterWriteStore = columnChunkPageWriteStore;
@@ -173,6 +180,7 @@ class InternalParquetRecordWriter<T> {
}
if (recordCount > 0) {
+ rowGroupOrdinal++;
parquetFileWriter.startBlock(recordCount);
columnStore.flush();
pageStore.flushToFileWriter(parquetFileWriter);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 18fbf6d..0590638 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -22,17 +22,19 @@ import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER;
import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY;
import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS;
+import static org.apache.parquet.format.Util.readFileCryptoMetaData;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.calculateOffsetRanges;
import static org.apache.parquet.hadoop.ColumnIndexFilterUtils.filterOffsetIndex;
+import static org.apache.parquet.hadoop.ParquetFileWriter.EFMAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
import java.io.Closeable;
-import java.io.InputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -63,7 +65,6 @@ import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
@@ -73,12 +74,20 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
+import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.BloomFilterHeader;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.FileCryptoMetaData;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -501,7 +510,8 @@ public class ParquetFileReader implements Closeable {
public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
ParquetReadOptions options;
if (file instanceof HadoopInputFile) {
- options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
+ HadoopInputFile hadoopFile = (HadoopInputFile) file;
+ options = HadoopReadOptions.builder(hadoopFile.getConfiguration(), hadoopFile.getPath())
.withMetadataFilter(filter).build();
} else {
options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
@@ -517,37 +527,71 @@ public class ParquetFileReader implements Closeable {
return readFooter(file, options, f, converter);
}
- private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
+ private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options,
+ SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
+
long fileLen = file.getLength();
+ String filePath = file.toString();
LOG.debug("File length {}", fileLen);
+
int FOOTER_LENGTH_SIZE = 4;
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
- throw new RuntimeException(file.toString() + " is not a Parquet file (too small length: " + fileLen + ")");
+ throw new RuntimeException(filePath + " is not a Parquet file (length is too low: " + fileLen + ")");
}
- long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
- LOG.debug("reading footer index at {}", footerLengthIndex);
- f.seek(footerLengthIndex);
- int footerLength = readIntLittleEndian(f);
+ // Read footer length and magic string - with a single seek
byte[] magic = new byte[MAGIC.length];
+ long fileMetadataLengthIndex = fileLen - magic.length - FOOTER_LENGTH_SIZE;
+ LOG.debug("reading footer index at {}", fileMetadataLengthIndex);
+ f.seek(fileMetadataLengthIndex);
+ int fileMetadataLength = readIntLittleEndian(f);
f.readFully(magic);
- if (!Arrays.equals(MAGIC, magic)) {
- throw new RuntimeException(file.toString() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
+
+ boolean encryptedFooterMode;
+ if (Arrays.equals(MAGIC, magic)) {
+ encryptedFooterMode = false;
+ } else if (Arrays.equals(EFMAGIC, magic)) {
+ encryptedFooterMode = true;
+ } else {
+ throw new RuntimeException(filePath + " is not a Parquet file. Expected magic number at tail, but found " + Arrays.toString(magic));
+ }
+
+ long fileMetadataIndex = fileMetadataLengthIndex - fileMetadataLength;
+ LOG.debug("read footer length: {}, footer index: {}", fileMetadataLength, fileMetadataIndex);
+ if (fileMetadataIndex < magic.length || fileMetadataIndex >= fileMetadataLengthIndex) {
+ throw new RuntimeException("corrupted file: the footer index is not within the file: " + fileMetadataIndex);
}
- long footerIndex = footerLengthIndex - footerLength;
- LOG.debug("read footer length: {}, footer index: {}", footerLength, footerIndex);
- if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
- throw new RuntimeException("corrupted file: the footer index is not within the file: " + footerIndex);
+ f.seek(fileMetadataIndex);
+
+ FileDecryptionProperties fileDecryptionProperties = options.getDecryptionProperties();
+ InternalFileDecryptor fileDecryptor = null;
+ if (null != fileDecryptionProperties) {
+ fileDecryptor = new InternalFileDecryptor(fileDecryptionProperties);
}
- f.seek(footerIndex);
+
// Read all the footer bytes in one time to avoid multiple read operations,
// since it can be pretty time consuming for a single read operation in HDFS.
- ByteBuffer footerBytesBuffer = ByteBuffer.allocate(footerLength);
+ ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength);
f.readFully(footerBytesBuffer);
LOG.debug("Finished to read all footer bytes.");
footerBytesBuffer.flip();
InputStream footerBytesStream = ByteBufferInputStream.wrap(footerBytesBuffer);
- return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter());
+
+ // Regular file, or encrypted file with plaintext footer
+ if (!encryptedFooterMode) {
+ return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, false,
+ fileMetadataLength);
+ }
+
+ // Encrypted file with encrypted footer
+ if (null == fileDecryptor) {
+ throw new ParquetCryptoRuntimeException("Trying to read file with encrypted footer. No keys available");
+ }
+ FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream);
+ fileDecryptor.setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(),
+ true, fileCryptoMetaData.getKey_metadata());
+ // footer length is required only for signed plaintext footers
+ return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
}
/**
@@ -560,7 +604,7 @@ public class ParquetFileReader implements Closeable {
@Deprecated
public static ParquetFileReader open(Configuration conf, Path file) throws IOException {
return new ParquetFileReader(HadoopInputFile.fromPath(file, conf),
- HadoopReadOptions.builder(conf).build());
+ HadoopReadOptions.builder(conf, file).build());
}
/**
@@ -574,7 +618,7 @@ public class ParquetFileReader implements Closeable {
@Deprecated
public static ParquetFileReader open(Configuration conf, Path file, MetadataFilter filter) throws IOException {
return open(HadoopInputFile.fromPath(file, conf),
- HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
+ HadoopReadOptions.builder(conf, file).withMetadataFilter(filter).build());
}
/**
@@ -613,6 +657,7 @@ public class ParquetFileReader implements Closeable {
return new ParquetFileReader(file, options);
}
+
private final InputFile file;
private final SeekableInputStream f;
private final ParquetReadOptions options;
@@ -629,6 +674,8 @@ public class ParquetFileReader implements Closeable {
private ColumnChunkPageReadStore currentRowGroup = null;
private DictionaryPageReader nextDictionaryReader = null;
+ private InternalFileDecryptor fileDecryptor = null;
+
/**
* @param configuration the Hadoop conf
* @param filePath Path for the parquet file
@@ -660,7 +707,14 @@ public class ParquetFileReader implements Closeable {
this.file = HadoopInputFile.fromPath(filePath, configuration);
this.fileMetaData = fileMetaData;
this.f = file.newStream();
- this.options = HadoopReadOptions.builder(configuration).build();
+ this.fileDecryptor = fileMetaData.getFileDecryptor();
+ if (null == fileDecryptor) {
+ this.options = HadoopReadOptions.builder(configuration).build();
+ } else {
+ this.options = HadoopReadOptions.builder(configuration)
+ .withDecryption(fileDecryptor.getDecryptionProperties())
+ .build();
+ }
this.blocks = filterRowGroups(blocks);
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
@@ -680,7 +734,7 @@ public class ParquetFileReader implements Closeable {
@Deprecated
public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException {
this(HadoopInputFile.fromPath(file, conf),
- HadoopReadOptions.builder(conf).withMetadataFilter(filter).build());
+ HadoopReadOptions.builder(conf, file).withMetadataFilter(filter).build());
}
/**
@@ -695,9 +749,16 @@ public class ParquetFileReader implements Closeable {
this.converter = new ParquetMetadataConverter(conf);
this.file = HadoopInputFile.fromPath(file, conf);
this.f = this.file.newStream();
- this.options = HadoopReadOptions.builder(conf).build();
- this.footer = footer;
this.fileMetaData = footer.getFileMetaData();
+ this.fileDecryptor = fileMetaData.getFileDecryptor();
+ if (null == fileDecryptor) {
+ this.options = HadoopReadOptions.builder(conf).build();
+ } else {
+ this.options = HadoopReadOptions.builder(conf)
+ .withDecryption(fileDecryptor.getDecryptionProperties())
+ .build();
+ }
+ this.footer = footer;
this.blocks = filterRowGroups(footer.getBlocks());
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
@@ -721,6 +782,11 @@ public class ParquetFileReader implements Closeable {
throw e;
}
this.fileMetaData = footer.getFileMetaData();
+ this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
+ if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
+ this.fileDecryptor = null; // Plaintext file. No need in decryptor
+ }
+
this.blocks = filterRowGroups(footer.getBlocks());
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
@@ -843,9 +909,9 @@ public class ParquetFileReader implements Closeable {
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
- BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
+ BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
long startingPos = mc.getStartingPos();
// first part or not consecutive => new list
if (currentParts == null || currentParts.endPos() != startingPos) {
@@ -861,7 +927,7 @@ public class ParquetFileReader implements Closeable {
consecutiveChunks.readAll(f, builder);
}
for (Chunk chunk : builder.build()) {
- currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ readChunkPages(chunk, block);
}
// avoid re-reading bytes the dictionary reader is used after this call
@@ -941,7 +1007,7 @@ public class ParquetFileReader implements Closeable {
consecutiveChunks.readAll(f, builder);
}
for (Chunk chunk : builder.build()) {
- currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ readChunkPages(chunk, block);
}
// avoid re-reading bytes the dictionary reader is used after this call
@@ -954,6 +1020,23 @@ public class ParquetFileReader implements Closeable {
return currentRowGroup;
}
+ private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException {
+ if (null == fileDecryptor || fileDecryptor.plaintextFile()) {
+ currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ return;
+ }
+ // Encrypted file
+ ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath());
+ InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath);
+ if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
+ currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
+ } else { // encrypted column
+ currentRowGroup.addColumn(chunk.descriptor.col,
+ chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(),
+ fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal()));
+ }
+ }
+
private ColumnIndexStore getColumnIndexStore(int blockIndex) {
ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
if (ciStore == null) {
@@ -1017,8 +1100,7 @@ public class ParquetFileReader implements Closeable {
* @throws IOException if there is an error while reading the dictionary
*/
DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException {
- if (!meta.getEncodings().contains(Encoding.PLAIN_DICTIONARY) &&
- !meta.getEncodings().contains(Encoding.RLE_DICTIONARY)) {
+ if (!meta.hasDictionaryPage()) {
return null;
}
@@ -1027,12 +1109,34 @@ public class ParquetFileReader implements Closeable {
f.seek(meta.getStartingPos());
}
- PageHeader pageHeader = Util.readPageHeader(f);
+ boolean encryptedColumn = false;
+ InternalColumnDecryptionSetup columnDecryptionSetup = null;
+ byte[] dictionaryPageAAD = null;
+ BlockCipher.Decryptor pageDecryptor = null;
+ if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+ columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
+ if (columnDecryptionSetup.isEncrypted()) {
+ encryptedColumn = true;
+ }
+ }
+
+ PageHeader pageHeader;
+ if (!encryptedColumn) {
+ pageHeader = Util.readPageHeader(f);
+ } else {
+ byte[] dictionaryPageHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPageHeader,
+ meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+ pageHeader = Util.readPageHeader(f, columnDecryptionSetup.getMetaDataDecryptor(), dictionaryPageHeaderAAD);
+ dictionaryPageAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPage,
+ meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+ pageDecryptor = columnDecryptionSetup.getDataDecryptor();
+ }
+
if (!pageHeader.isSetDictionary_page_header()) {
return null; // TODO: should this complain?
}
- DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f);
+ DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f, pageDecryptor, dictionaryPageAAD);
BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(meta.getCodec());
return new DictionaryPage(
@@ -1042,7 +1146,8 @@ public class ParquetFileReader implements Closeable {
}
private DictionaryPage readCompressedDictionary(
- PageHeader pageHeader, SeekableInputStream fin) throws IOException {
+ PageHeader pageHeader, SeekableInputStream fin,
+ BlockCipher.Decryptor pageDecryptor, byte[] dictionaryPageAAD) throws IOException {
DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
@@ -1053,6 +1158,10 @@ public class ParquetFileReader implements Closeable {
BytesInput bin = BytesInput.from(dictPageBytes);
+ if (null != pageDecryptor) {
+ bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(), dictionaryPageAAD));
+ }
+
return new DictionaryPage(
bin, uncompressedPageSize, dictHeader.getNum_values(),
converter.getEncoding(dictHeader.getEncoding()));
@@ -1071,12 +1180,30 @@ public class ParquetFileReader implements Closeable {
*/
public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
long bloomFilterOffset = meta.getBloomFilterOffset();
- f.seek(bloomFilterOffset);
- BloomFilterHeader bloomFilterHeader;
+ if (0 == bloomFilterOffset) {
+ return null;
+ }
+
+ // Prepare to decrypt Bloom filter (for encrypted columns)
+ BlockCipher.Decryptor bloomFilterDecryptor = null;
+ byte[] bloomFilterHeaderAAD = null;
+ byte[] bloomFilterBitsetAAD = null;
+ if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+ InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
+ if (columnDecryptionSetup.isEncrypted()) {
+ bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
+ bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterHeader,
+ meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+ bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterBitset,
+ meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+ }
+ }
// Read Bloom filter data header.
+ f.seek(bloomFilterOffset);
+ BloomFilterHeader bloomFilterHeader;
try {
- bloomFilterHeader = Util.readBloomFilterHeader(f);
+ bloomFilterHeader = Util.readBloomFilterHeader(f, bloomFilterDecryptor, bloomFilterHeaderAAD);
} catch (IOException e) {
LOG.warn("read no bloom filter");
return null;
@@ -1095,8 +1222,16 @@ public class ParquetFileReader implements Closeable {
return null;
}
- byte[] bitset = new byte[numBytes];
- f.readFully(bitset);
+ byte[] bitset;
+ if (null == bloomFilterDecryptor) {
+ bitset = new byte[numBytes];
+ f.readFully(bitset);
+ } else {
+ bitset = bloomFilterDecryptor.decrypt(f, bloomFilterBitsetAAD);
+ if (bitset.length != numBytes) {
+ throw new ParquetCryptoRuntimeException("Wrong length of decrypted bloom filter bitset");
+ }
+ }
return new BlockSplitBloomFilter(bitset);
}
@@ -1114,7 +1249,19 @@ public class ParquetFileReader implements Closeable {
return null;
}
f.seek(ref.getOffset());
- return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), Util.readColumnIndex(f));
+
+ BlockCipher.Decryptor columnIndexDecryptor = null;
+ byte[] columnIndexAAD = null;
+ if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+ InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
+ if (columnDecryptionSetup.isEncrypted()) {
+ columnIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
+ columnIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnIndex,
+ column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+ }
+ }
+ return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(),
+ Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD));
}
/**
@@ -1131,7 +1278,18 @@ public class ParquetFileReader implements Closeable {
return null;
}
f.seek(ref.getOffset());
- return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+
+ BlockCipher.Decryptor offsetIndexDecryptor = null;
+ byte[] offsetIndexAAD = null;
+ if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+ InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
+ if (columnDecryptionSetup.isEncrypted()) {
+ offsetIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
+ offsetIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.OffsetIndex,
+ column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
+ }
+ }
+ return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f, offsetIndexDecryptor, offsetIndexAAD));
}
@Override
@@ -1217,7 +1375,11 @@ public class ParquetFileReader implements Closeable {
}
protected PageHeader readPageHeader() throws IOException {
- return Util.readPageHeader(stream);
+ return readPageHeader(null, null);
+ }
+
+ protected PageHeader readPageHeader(BlockCipher.Decryptor blockDecryptor, byte[] pageHeaderAAD) throws IOException {
+ return Util.readPageHeader(stream, blockDecryptor, pageHeaderAAD);
}
/**
@@ -1237,14 +1399,34 @@ public class ParquetFileReader implements Closeable {
* @return the list of pages
*/
public ColumnChunkPageReader readAllPages() throws IOException {
+ return readAllPages(null, null, null, -1, -1);
+ }
+
+ public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor,
+ byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) throws IOException {
List<DataPage> pagesInChunk = new ArrayList<DataPage>();
DictionaryPage dictionaryPage = null;
PrimitiveType type = getFileMetaData().getSchema()
.getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
int dataPageCountReadSoFar = 0;
+ byte[] dataPageHeaderAAD = null;
+ if (null != headerBlockDecryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal,
+ columnOrdinal, getPageOrdinal(dataPageCountReadSoFar));
+ }
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
- PageHeader pageHeader = readPageHeader();
+ byte[] pageHeaderAAD = dataPageHeaderAAD;
+ if (null != headerBlockDecryptor) {
+ // Important: this verifies file integrity (makes sure dictionary page had not been removed)
+ if (null == dictionaryPage && descriptor.metadata.hasDictionaryPage()) {
+ pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
+ } else {
+ int pageOrdinal = getPageOrdinal(dataPageCountReadSoFar);
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
+ }
+ PageHeader pageHeader = readPageHeader(headerBlockDecryptor, pageHeaderAAD);
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput pageBytes;
@@ -1336,7 +1518,7 @@ public class ParquetFileReader implements Closeable {
}
BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
- blocks.get(currentBlock).getRowCount());
+ blocks.get(currentBlock).getRowCount(), pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal);
}
private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) {
@@ -1344,6 +1526,14 @@ public class ParquetFileReader implements Closeable {
: dataPageCountReadSoFar < offsetIndex.getPageCount();
}
+ private int getPageOrdinal(int dataPageCountReadSoFar) {
+ if (null == offsetIndex) {
+ return dataPageCountReadSoFar;
+ }
+
+ return offsetIndex.getPageOrdinal(dataPageCountReadSoFar);
+ }
+
/**
* @param size the size of the page
* @return the page
@@ -1530,7 +1720,5 @@ public class ParquetFileReader implements Closeable {
public long endPos() {
return offset + length;
}
-
}
-
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c6ee1e9..a1017ca 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -18,14 +18,17 @@
*/
package org.apache.parquet.hadoop;
+import static org.apache.parquet.format.Util.writeFileCryptoMetaData;
import static org.apache.parquet.format.Util.writeFileMetaData;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
@@ -51,8 +54,17 @@ import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -90,6 +102,8 @@ public class ParquetFileWriter {
public static final String PARQUET_METADATA_FILE = "_metadata";
public static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+ public static final String EF_MAGIC_STR = "PARE";
+ public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
public static final int CURRENT_VERSION = 1;
@@ -113,6 +127,9 @@ public class ParquetFileWriter {
// The Bloom filters
private final List<Map<String, BloomFilter>> bloomFilters = new ArrayList<>();
+
+ // The file encryptor
+ private final InternalFileEncryptor fileEncryptor;
// row group data
private BlockMetaData currentBlock; // appended to by endColumn
@@ -280,8 +297,17 @@ public class ParquetFileWriter {
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+ long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+ int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+ throws IOException{
+ this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength,
+ statisticsTruncateLength, pageWriteChecksumEnabled, null);
+ }
+
+ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
- int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+ int statisticsTruncateLength, boolean pageWriteChecksumEnabled,
+ FileEncryptionProperties encryptionProperties)
throws IOException {
TypeUtil.checkValidWriteSchema(schema);
@@ -307,6 +333,22 @@ public class ParquetFileWriter {
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
+
+ if (null == encryptionProperties) {
+ this.fileEncryptor = null;
+ } else {
+ // Verify that every encrypted column is in file schema
+ Map<ColumnPath, ColumnEncryptionProperties> columnEncryptionProperties = encryptionProperties.getEncryptedColumns();
+ if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key
+ for (Map.Entry<ColumnPath, ColumnEncryptionProperties> entry : columnEncryptionProperties.entrySet()) {
+ String[] path = entry.getKey().toArray();
+ if(!schema.containsPath(path)) {
+ throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema");
+ }
+ }
+ }
+ this.fileEncryptor = new InternalFileEncryptor(encryptionProperties);
+ }
}
/**
@@ -334,6 +376,7 @@ public class ParquetFileWriter {
this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ this.fileEncryptor = null;
}
/**
* start the file
@@ -342,7 +385,15 @@ public class ParquetFileWriter {
public void start() throws IOException {
state = state.start();
LOG.debug("{}: start", out.getPos());
- out.write(MAGIC);
+ byte[] magic = MAGIC;
+ if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+ magic = EFMAGIC;
+ }
+ out.write(magic);
+ }
+
+ InternalFileEncryptor getEncryptor() {
+ return fileEncryptor;
}
/**
@@ -400,6 +451,11 @@ public class ParquetFileWriter {
* @throws IOException if there is an error while writing
*/
public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+ writeDictionaryPage(dictionaryPage, null, null);
+ }
+
+ public void writeDictionaryPage(DictionaryPage dictionaryPage,
+ BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException {
state = state.write();
LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
currentChunkDictionaryPageOffset = out.getPos();
@@ -414,20 +470,24 @@ public class ParquetFileWriter {
dictionaryPage.getDictionarySize(),
dictionaryPage.getEncoding(),
(int) crc.getValue(),
- out);
+ out,
+ headerBlockEncryptor,
+ AAD);
} else {
metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
dictionaryPage.getDictionarySize(),
dictionaryPage.getEncoding(),
- out);
+ out,
+ headerBlockEncryptor,
+ AAD);
}
long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
this.uncompressedLength += uncompressedSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
- dictionaryPage.getBytes().writeAllTo(out);
+ dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
currentEncodings.add(dictionaryPage.getEncoding());
}
@@ -690,11 +750,39 @@ public class ParquetFileWriter {
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings) throws IOException {
+ writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes,
+ uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder,
+ bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null);
+ }
+
+ void writeColumnChunk(ColumnDescriptor descriptor,
+ long valueCount,
+ CompressionCodecName compressionCodecName,
+ DictionaryPage dictionaryPage,
+ BytesInput bytes,
+ long uncompressedTotalPageSize,
+ long compressedTotalPageSize,
+ Statistics<?> totalStats,
+ ColumnIndexBuilder columnIndexBuilder,
+ OffsetIndexBuilder offsetIndexBuilder,
+ BloomFilter bloomFilter,
+ Set<Encoding> rlEncodings,
+ Set<Encoding> dlEncodings,
+ List<Encoding> dataEncodings,
+ BlockCipher.Encryptor headerBlockEncryptor,
+ int rowGroupOrdinal,
+ int columnOrdinal,
+ byte[] fileAAD) throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);
state = state.write();
if (dictionaryPage != null) {
- writeDictionaryPage(dictionaryPage);
+ byte[] dictonaryPageHeaderAAD = null;
+ if (null != headerBlockEncryptor) {
+ dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+ rowGroupOrdinal, columnOrdinal, -1);
+ }
+ writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
}
if (bloomFilter != null) {
@@ -772,6 +860,7 @@ public class ParquetFileWriter {
state = state.endBlock();
LOG.debug("{}: end block", out.getPos());
currentBlock.setRowCount(currentRecordCount);
+ currentBlock.setOrdinal(blocks.size());
blocks.add(currentBlock);
columnIndexes.add(currentColumnIndexes);
offsetIndexes.add(currentOffsetIndexes);
@@ -958,22 +1047,24 @@ public class ParquetFileWriter {
*/
public void end(Map<String, String> extraMetaData) throws IOException {
state = state.end();
- serializeColumnIndexes(columnIndexes, blocks, out);
- serializeOffsetIndexes(offsetIndexes, blocks, out);
- serializeBloomFilters(bloomFilters, blocks, out);
+ serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+ serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+ serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out);
+ serializeFooter(footer, out, fileEncryptor);
out.close();
}
private static void serializeColumnIndexes(
List<List<ColumnIndex>> columnIndexes,
List<BlockMetaData> blocks,
- PositionOutputStream out) throws IOException {
+ PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
LOG.debug("{}: column indexes", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
- List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+ BlockMetaData block = blocks.get(bIndex);
+ List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
ColumnChunkMetaData column = columns.get(cIndex);
@@ -982,8 +1073,18 @@ public class ParquetFileWriter {
if (columnIndex == null) {
continue;
}
+ BlockCipher.Encryptor columnIndexEncryptor = null;
+ byte[] columnIndexAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+ columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex,
+ block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+ }
+ }
long offset = out.getPos();
- Util.writeColumnIndex(columnIndex, out);
+ Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD);
column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
}
}
@@ -999,10 +1100,12 @@ public class ParquetFileWriter {
private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
- PositionOutputStream out) throws IOException {
+ PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
LOG.debug("{}: offset indexes", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
- List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+ BlockMetaData block = blocks.get(bIndex);
+ List<ColumnChunkMetaData> columns = block.getColumns();
List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
@@ -1010,8 +1113,18 @@ public class ParquetFileWriter {
continue;
}
ColumnChunkMetaData column = columns.get(cIndex);
+ BlockCipher.Encryptor offsetIndexEncryptor = null;
+ byte[] offsetIndexAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+ offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex,
+ block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+ }
+ }
long offset = out.getPos();
- Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out);
+ Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD);
column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
}
}
@@ -1020,10 +1133,12 @@ public class ParquetFileWriter {
private static void serializeBloomFilters(
List<Map<String, BloomFilter>> bloomFilters,
List<BlockMetaData> blocks,
- PositionOutputStream out) throws IOException {
+ PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
LOG.debug("{}: bloom filters", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
- List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+ BlockMetaData block = blocks.get(bIndex);
+ List<ColumnChunkMetaData> columns = block.getColumns();
Map<String, BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
if (blockBloomFilters.isEmpty()) continue;
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
@@ -1035,20 +1150,90 @@ public class ParquetFileWriter {
long offset = out.getPos();
column.setBloomFilterOffset(offset);
- Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out);
- bloomFilter.writeTo(out);
+
+ BlockCipher.Encryptor bloomFilterEncryptor = null;
+ byte[] bloomFilterHeaderAAD = null;
+ byte[] bloomFilterBitsetAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+ int columnOrdinal = columnEncryptionSetup.getOrdinal();
+ bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader,
+ block.getOrdinal(), columnOrdinal, -1);
+ bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset,
+ block.getOrdinal(), columnOrdinal, -1);
+ }
+ }
+
+ Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out,
+ bloomFilterEncryptor, bloomFilterHeaderAAD);
+
+ ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+ bloomFilter.writeTo(tempOutStream);
+ byte[] serializedBitset = tempOutStream.toByteArray();
+ if (null != bloomFilterEncryptor) {
+ serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
+ }
+ out.write(serializedBitset);
}
}
}
-
- private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
- long footerIndex = out.getPos();
+
+ private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
+
ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
- org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
- writeFileMetaData(parquetMetadata, out);
- LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
- BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
- out.write(MAGIC);
+
+ // Unencrypted file
+ if (null == fileEncryptor) {
+ long footerIndex = out.getPos();
+ org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+ writeFileMetaData(parquetMetadata, out);
+ LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+ out.write(MAGIC);
+ return;
+ }
+
+ org.apache.parquet.format.FileMetaData parquetMetadata =
+ metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor);
+
+ // Encrypted file with plaintext footer
+ if (!fileEncryptor.isFooterEncrypted()) {
+ long footerIndex = out.getPos();
+ parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm());
+ // create footer signature (nonce + tag of encrypted footer)
+ byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData();
+ if (null != footerSigningKeyMetaData) {
+ parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData);
+ }
+ ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+ writeFileMetaData(parquetMetadata, tempOutStream);
+ byte[] serializedFooter = tempOutStream.toByteArray();
+ byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+ byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD);
+ byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH];
+ System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce
+ System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH,
+ signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag
+ out.write(serializedFooter);
+ out.write(signature);
+ LOG.debug("{}: footer and signature length = {}" , out.getPos(), (out.getPos() - footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+ out.write(MAGIC);
+ return;
+ }
+
+ // Encrypted file with encrypted footer
+ long cryptoFooterIndex = out.getPos();
+ writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out);
+ byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+ writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD);
+ int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex);
+ LOG.debug("{}: crypto metadata and footer length = {}" , out.getPos(), combinedMetaDataLength);
+ BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength);
+ out.write(EFMAGIC);
}
public ParquetMetadata getFooter() {
@@ -1157,7 +1342,7 @@ public class ParquetFileWriter {
throws IOException {
PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
metadata.write(MAGIC);
- serializeFooter(metadataFooter, metadata);
+ serializeFooter(metadataFooter, metadata, null);
metadata.close();
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index a4c8e45..4eb0408 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.EncryptionPropertiesFactory;
+import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext;
@@ -471,10 +473,13 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
LOG.info("Parquet properties are:\n{}", props);
}
- WriteContext init = writeSupport.init(conf);
+ WriteContext fileWriteContext = writeSupport.init(conf);
+
+ FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);
+
ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
- init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
- props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled());
+ fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
+ props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties);
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
@@ -494,8 +499,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
return new ParquetRecordWriter<T>(
w,
writeSupport,
- init.getSchema(),
- init.getExtraMetaData(),
+ fileWriteContext.getSchema(),
+ fileWriteContext.getExtraMetaData(),
blockSize,
codec,
validating,
@@ -539,4 +544,13 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public synchronized static MemoryManager getMemoryManager() {
return memoryManager;
}
+
+ private static FileEncryptionProperties createEncryptionProperties(Configuration fileHadoopConfig, Path tempFilePath,
+ WriteContext fileWriteContext) {
+ EncryptionPropertiesFactory cryptoFactory = EncryptionPropertiesFactory.loadFactory(fileHadoopConfig);
+ if (null == cryptoFactory) {
+ return null;
+ }
+ return cryptoFactory.getFileEncryptionProperties(fileHadoopConfig, tempFilePath, fileWriteContext);
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index 71fde69..c215f5e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
@@ -108,7 +109,7 @@ public class ParquetReader<T> implements Closeable {
ReadSupport<T> readSupport,
FilterCompat.Filter filter) throws IOException {
this(Collections.singletonList((InputFile) HadoopInputFile.fromPath(file, conf)),
- HadoopReadOptions.builder(conf)
+ HadoopReadOptions.builder(conf, file)
.withRecordFilter(Objects.requireNonNull(filter, "filter cannot be null"))
.build(),
readSupport);
@@ -185,7 +186,7 @@ public class ParquetReader<T> implements Closeable {
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
this.conf = new Configuration();
- this.optionsBuilder = HadoopReadOptions.builder(conf);
+ this.optionsBuilder = HadoopReadOptions.builder(conf, path);
}
@Deprecated
@@ -194,7 +195,7 @@ public class ParquetReader<T> implements Closeable {
this.file = null;
this.path = Objects.requireNonNull(path, "path cannot be null");
this.conf = new Configuration();
- this.optionsBuilder = HadoopReadOptions.builder(conf);
+ this.optionsBuilder = HadoopReadOptions.builder(conf, path);
}
protected Builder(InputFile file) {
@@ -202,11 +203,13 @@ public class ParquetReader<T> implements Closeable {
this.file = Objects.requireNonNull(file, "file cannot be null");
this.path = null;
if (file instanceof HadoopInputFile) {
- this.conf = ((HadoopInputFile) file).getConfiguration();
+ HadoopInputFile hadoopFile = (HadoopInputFile) file;
+ this.conf = hadoopFile.getConfiguration();
+ optionsBuilder = HadoopReadOptions.builder(conf, hadoopFile.getPath());
} else {
this.conf = new Configuration();
+ optionsBuilder = HadoopReadOptions.builder(conf);
}
- optionsBuilder = HadoopReadOptions.builder(conf);
}
// when called, resets options to the defaults from conf
@@ -308,6 +311,11 @@ public class ParquetReader<T> implements Closeable {
optionsBuilder.withCodecFactory(codecFactory);
return this;
}
+
+ public Builder<T> withDecryption(FileDecryptionProperties fileDecryptionProperties) {
+ optionsBuilder.withDecryption(fileDecryptionProperties);
+ return this;
+ }
public Builder<T> set(String key, String value) {
optionsBuilder.set(key, value);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 492d917..4653410 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -151,7 +151,7 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
long[] rowGroupOffsets = split.getRowGroupOffsets();
// if task.side.metadata is set, rowGroupOffsets is null
- ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration);
+ ParquetReadOptions.Builder optionsBuilder = HadoopReadOptions.builder(configuration, path);
if (rowGroupOffsets != null) {
optionsBuilder.withOffsets(rowGroupOffsets);
} else {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index c6b2828..ecc12de 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -193,7 +194,7 @@ public class ParquetWriter<T> implements Closeable {
compressionCodecName, blockSize, pageSize, dictionaryPageSize,
enableDictionary, validating, writerVersion, conf);
}
-
+
/**
* Create a new ParquetWriter.
*
@@ -232,7 +233,7 @@ public class ParquetWriter<T> implements Closeable {
.withDictionaryPageSize(dictionaryPageSize)
.withDictionaryEncoding(enableDictionary)
.withWriterVersion(writerVersion)
- .build());
+ .build(), null);
}
/**
@@ -272,7 +273,8 @@ public class ParquetWriter<T> implements Closeable {
boolean validating,
Configuration conf,
int maxPaddingSize,
- ParquetProperties encodingProps) throws IOException {
+ ParquetProperties encodingProps,
+ FileEncryptionProperties encryptionProperties) throws IOException {
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
MessageType schema = writeContext.getSchema();
@@ -280,7 +282,7 @@ public class ParquetWriter<T> implements Closeable {
ParquetFileWriter fileWriter = new ParquetFileWriter(
file, schema, mode, rowGroupSize, maxPaddingSize,
encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
- encodingProps.getPageWriteChecksumEnabled());
+ encodingProps.getPageWriteChecksumEnabled(), encryptionProperties);
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
@@ -342,6 +344,7 @@ public class ParquetWriter<T> implements Closeable {
public abstract static class Builder<T, SELF extends Builder<T, SELF>> {
private OutputFile file = null;
private Path path = null;
+ private FileEncryptionProperties encryptionProperties = null;
private Configuration conf = new Configuration();
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
@@ -406,6 +409,18 @@ public class ParquetWriter<T> implements Closeable {
}
/**
+ * Set the {@link FileEncryptionProperties file encryption properties} used by the
+ * constructed writer.
+ *
+ * @param encryptionProperties a {@code FileEncryptionProperties}
+ * @return this builder for method chaining.
+ */
+ public SELF withEncryption (FileEncryptionProperties encryptionProperties) {
+ this.encryptionProperties = encryptionProperties;
+ return self();
+ }
+
+ /**
* Set the Parquet format row group size used by the constructed writer.
*
* @param rowGroupSize an integer size in bytes
@@ -565,6 +580,7 @@ public class ParquetWriter<T> implements Closeable {
*/
public SELF withBloomFilterNDV(String columnPath, long ndv) {
encodingPropsBuilder.withBloomFilterNDV(columnPath, ndv);
+
return self();
}
@@ -615,13 +631,13 @@ public class ParquetWriter<T> implements Closeable {
if (file != null) {
return new ParquetWriter<>(file,
mode, getWriteSupport(conf), codecName, rowGroupSize, enableValidation, conf,
- maxPaddingSize, encodingPropsBuilder.build());
+ maxPaddingSize, encodingPropsBuilder.build(), encryptionProperties);
} else {
return new ParquetWriter<>(HadoopOutputFile.fromPath(path, conf),
mode, getWriteSupport(conf), codecName,
rowGroupSize, enableValidation, conf, maxPaddingSize,
- encodingPropsBuilder.build());
+ encodingPropsBuilder.build(), encryptionProperties);
}
}
}
-}
+}
\ No newline at end of file
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
index 0cad0a5..ce204dc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -32,11 +32,11 @@ public class BlockMetaData {
private long rowCount;
private long totalByteSize;
private String path;
+ private int ordinal;
public BlockMetaData() {
}
-
-
+
/**
* @param path the path to the file containing the data. Or null if same file the metadata was found
*/
@@ -102,6 +102,7 @@ public class BlockMetaData {
public long getStartingPos() {
return getColumns().get(0).getStartingPos();
}
+
@Override
public String toString() {
return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
@@ -117,4 +118,19 @@ public class BlockMetaData {
}
return totalSize;
}
+
+ /**
+ * @return row group ordinal
+ */
+ public int getOrdinal() {
+ return ordinal;
+ }
+
+ /**
+ *
+ * @param ordinal - row group ordinal
+ */
+ public void setOrdinal(int ordinal) {
+ this.ordinal = ordinal;
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 2c24356..e816b27 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -18,12 +18,25 @@
*/
package org.apache.parquet.hadoop.metadata;
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+import static org.apache.parquet.format.Util.readColumnMetaData;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.Set;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnDecryptionSetup;
+import org.apache.parquet.crypto.InternalFileDecryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.format.ColumnMetaData;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -34,6 +47,7 @@ import org.apache.yetus.audience.InterfaceAudience.Private;
* Column meta data for a block stored in the file footer and passed in the InputSplit
*/
abstract public class ColumnChunkMetaData {
+ protected int rowGroupOrdinal = -1;
@Deprecated
public static ColumnChunkMetaData get(
@@ -98,6 +112,7 @@ abstract public class ColumnChunkMetaData {
long valueCount,
long totalSize,
long totalUncompressedSize) {
+
return get(path, Types.optional(type).named("fake_type"), codec, encodingStats, encodings, statistics,
firstDataPage, dictionaryPageOffset, valueCount, totalSize, totalUncompressedSize);
}
@@ -114,6 +129,7 @@ abstract public class ColumnChunkMetaData {
long valueCount,
long totalSize,
long totalUncompressedSize) {
+
// to save space we store those always positive longs in ints when they fit.
if (positiveLongFitsInAnInt(firstDataPage)
&& positiveLongFitsInAnInt(dictionaryPageOffset)
@@ -141,11 +157,32 @@ abstract public class ColumnChunkMetaData {
totalUncompressedSize);
}
}
+
+ // In sensitive columns, the ColumnMetaData structure is encrypted (with column-specific keys), making the fields like Statistics invisible.
+ // Decryption is not performed pro-actively, due to performance and authorization reasons.
+ // This method creates an a shell ColumnChunkMetaData object that keeps the encrypted metadata and the decryption tools.
+ // These tools will activated later - when/if the column is projected.
+ public static ColumnChunkMetaData getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path,
+ PrimitiveType type, byte[] encryptedMetadata, byte[] columnKeyMetadata,
+ InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal,
+ String createdBy) {
+ return new EncryptedColumnChunkMetaData(parquetMetadataConverter, path, type, encryptedMetadata, columnKeyMetadata,
+ fileDecryptor, rowGroupOrdinal, columnOrdinal, createdBy);
+ }
+
+ public void setRowGroupOrdinal (int rowGroupOrdinal) {
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ }
+
+ public int getRowGroupOrdinal() {
+ return rowGroupOrdinal;
+ }
/**
* @return the offset of the first byte in the chunk
*/
public long getStartingPos() {
+ decryptIfNeeded();
long dictionaryPageOffset = getDictionaryPageOffset();
long firstDataPageOffset = getFirstDataPageOffset();
if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) {
@@ -165,10 +202,10 @@ abstract public class ColumnChunkMetaData {
return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE);
}
- private final EncodingStats encodingStats;
+ EncodingStats encodingStats;
// we save 3 references by storing together the column properties that have few distinct values
- private final ColumnChunkProperties properties;
+ ColumnChunkProperties properties;
private IndexReference columnIndexReference;
private IndexReference offsetIndexReference;
@@ -184,7 +221,12 @@ abstract public class ColumnChunkMetaData {
this.properties = columnChunkProperties;
}
+ protected void decryptIfNeeded() {
+ return;
+ }
+
public CompressionCodecName getCodec() {
+ decryptIfNeeded();
return properties.getCodec();
}
@@ -202,6 +244,7 @@ abstract public class ColumnChunkMetaData {
*/
@Deprecated
public PrimitiveTypeName getType() {
+ decryptIfNeeded();
return properties.getType();
}
@@ -209,6 +252,7 @@ abstract public class ColumnChunkMetaData {
* @return the primitive type object of the column
*/
public PrimitiveType getPrimitiveType() {
+ decryptIfNeeded();
return properties.getPrimitiveType();
}
@@ -247,6 +291,7 @@ abstract public class ColumnChunkMetaData {
*/
@Private
public IndexReference getColumnIndexReference() {
+ decryptIfNeeded();
return columnIndexReference;
}
@@ -264,6 +309,7 @@ abstract public class ColumnChunkMetaData {
*/
@Private
public IndexReference getOffsetIndexReference() {
+ decryptIfNeeded();
return offsetIndexReference;
}
@@ -290,6 +336,7 @@ abstract public class ColumnChunkMetaData {
*/
@Private
public long getBloomFilterOffset() {
+ decryptIfNeeded();
return bloomFilterOffset;
}
@@ -297,17 +344,31 @@ abstract public class ColumnChunkMetaData {
* @return all the encodings used in this column
*/
public Set<Encoding> getEncodings() {
+ decryptIfNeeded();
return properties.getEncodings();
}
public EncodingStats getEncodingStats() {
+ decryptIfNeeded();
return encodingStats;
}
@Override
public String toString() {
+ decryptIfNeeded();
return "ColumnMetaData{" + properties.toString() + ", " + getFirstDataPageOffset() + "}";
}
+
+ public boolean hasDictionaryPage() {
+ EncodingStats stats = getEncodingStats();
+ if (stats != null) {
+ // ensure there is a dictionary page and that it is used to encode data pages
+ return stats.hasDictionaryPages() && stats.hasDictionaryEncodedPages();
+ }
+
+ Set<Encoding> encodings = getEncodings();
+ return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
+ }
}
class IntColumnChunkMetaData extends ColumnChunkMetaData {
@@ -415,6 +476,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
return statistics;
}
}
+
class LongColumnChunkMetaData extends ColumnChunkMetaData {
private final long firstDataPageOffset;
@@ -500,3 +562,103 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
}
}
+class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
+ private final ParquetMetadataConverter parquetMetadataConverter;
+ private final byte[] encryptedMetadata;
+ private final byte[] columnKeyMetadata;
+ private final InternalFileDecryptor fileDecryptor;
+
+ private final int columnOrdinal;
+ private final PrimitiveType primitiveType;
+ private final String createdBy;
+ private ColumnPath path;
+
+ private boolean decrypted;
+ private ColumnChunkMetaData shadowColumnChunkMetaData;
+
+ EncryptedColumnChunkMetaData(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, PrimitiveType type,
+ byte[] encryptedMetadata, byte[] columnKeyMetadata,
+ InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal, String createdBy) {
+ super((EncodingStats) null, (ColumnChunkProperties) null);
+ this.parquetMetadataConverter = parquetMetadataConverter;
+ this.path = path;
+ this.encryptedMetadata = encryptedMetadata;
+ this.columnKeyMetadata = columnKeyMetadata;
+ this.fileDecryptor = fileDecryptor;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.primitiveType = type;
+ this.createdBy = createdBy;
+
+ this.decrypted = false;
+ }
+
+ @Override
+ protected void decryptIfNeeded() {
+ if (decrypted) return;
+
+ if (null == fileDecryptor) {
+ throw new ParquetCryptoRuntimeException(path + ". Null File Decryptor");
+ }
+
+ // Decrypt the ColumnMetaData
+ InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(path, true, false,
+ columnKeyMetadata, columnOrdinal);
+
+ ColumnMetaData metaData;
+ ByteArrayInputStream tempInputStream = new ByteArrayInputStream(encryptedMetadata);
+ byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData,
+ rowGroupOrdinal, columnOrdinal, -1);
+ try {
+ metaData = readColumnMetaData(tempInputStream, columnDecryptionSetup.getMetaDataDecryptor(), columnMetaDataAAD);
+ } catch (IOException e) {
+ throw new ParquetCryptoRuntimeException(path + ". Failed to decrypt column metadata", e);
+ }
+ decrypted = true;
+ shadowColumnChunkMetaData = parquetMetadataConverter.buildColumnChunkMetaData(metaData, path, primitiveType, createdBy);
+ this.encodingStats = shadowColumnChunkMetaData.encodingStats;
+ this.properties = shadowColumnChunkMetaData.properties;
+ setBloomFilterOffset(metaData.bloom_filter_offset);
+ }
+
+ @Override
+ public ColumnPath getPath() {
+ return path;
+ }
+
+ @Override
+ public long getFirstDataPageOffset() {
+ decryptIfNeeded();
+ return shadowColumnChunkMetaData.getFirstDataPageOffset();
+ }
+
+ @Override
+ public long getDictionaryPageOffset() {
+ decryptIfNeeded();
+ return shadowColumnChunkMetaData.getDictionaryPageOffset();
+ }
+
+ @Override
+ public long getValueCount() {
+ decryptIfNeeded();
+ return shadowColumnChunkMetaData.getValueCount();
+ }
+
+ @Override
+ public long getTotalUncompressedSize() {
+ decryptIfNeeded();
+ return shadowColumnChunkMetaData.getTotalUncompressedSize();
+ }
+
+ @Override
+ public long getTotalSize() {
+ decryptIfNeeded();
+ return shadowColumnChunkMetaData.getTotalSize();
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ decryptIfNeeded();
+ return shadowColumnChunkMetaData.getStatistics();
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
index ca9488f..1668a7f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
+import org.apache.parquet.crypto.InternalFileDecryptor;
import org.apache.parquet.schema.MessageType;
@@ -38,6 +39,8 @@ public final class FileMetaData implements Serializable {
private final Map<String, String> keyValueMetaData;
private final String createdBy;
+
+ private final InternalFileDecryptor fileDecryptor;
/**
* @param schema the schema for the file
@@ -47,11 +50,16 @@ public final class FileMetaData implements Serializable {
* @throws NullPointerException if schema or keyValueMetaData is {@code null}
*/
public FileMetaData(MessageType schema, Map<String, String> keyValueMetaData, String createdBy) {
+ this(schema, keyValueMetaData, createdBy, null);
+ }
+
+ public FileMetaData(MessageType schema, Map<String, String> keyValueMetaData, String createdBy, InternalFileDecryptor fileDecryptor) {
super();
this.schema = Objects.requireNonNull(schema, "schema cannot be null");
this.keyValueMetaData = unmodifiableMap(Objects
.requireNonNull(keyValueMetaData, "keyValueMetaData cannot be null"));
this.createdBy = createdBy;
+ this.fileDecryptor = fileDecryptor;
}
/**
@@ -80,4 +88,7 @@ public final class FileMetaData implements Serializable {
return createdBy;
}
+ public InternalFileDecryptor getFileDecryptor() {
+ return fileDecryptor;
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
index fb876a8..ad26430 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java
@@ -54,6 +54,10 @@ public class HadoopInputFile implements InputFile {
public Configuration getConfiguration() {
return conf;
}
+
+ public Path getPath() {
+ return stat.getPath();
+ }
@Override
public long getLength() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
new file mode 100644
index 0000000..e4cc550
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomEncryption.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.TestColumnIndexEncryption.StringKeyIdRetriever;
+import org.apache.parquet.io.api.Binary;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.*;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+public class TestBloomEncryption {
+ private static final Path FILE_V1 = createTempFile();
+ private static final Path FILE_V2 = createTempFile();
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomEncryption.class);
+ private static final Random RANDOM = new Random(42);
+ private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
+ private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
+
+ private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
+
+ private final Path file;
+ public TestBloomEncryption(Path file) {
+ this.file = file;
+ }
+
+ private static Path createTempFile() {
+ try {
+ return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+ }
+
+ private static List<PhoneBookWriter.User> generateData(int rowCount) {
+ List<PhoneBookWriter.User> users = new ArrayList<>();
+ List<String> names = generateNames(rowCount);
+ for (int i = 0; i < rowCount; ++i) {
+ users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
+ }
+ return users;
+ }
+
+ private static List<String> generateNames(int rowCount) {
+ List<String> list = new ArrayList<>();
+
+ // Adding fix values for filtering
+ list.add("anderson");
+ list.add("anderson");
+ list.add("miller");
+ list.add("miller");
+ list.add("miller");
+ list.add("thomas");
+ list.add("thomas");
+ list.add("williams");
+
+ int nullCount = rowCount / 100;
+
+ String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
+ int maxLength = 8;
+ for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
+ int l = RANDOM.nextInt(maxLength);
+ StringBuilder builder = new StringBuilder(l);
+ for (int j = 0; j < l; ++j) {
+ builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
+ }
+ list.add(builder.toString());
+ }
+ list.sort((str1, str2) -> -str1.compareTo(str2));
+
+ // Adding nulls to random places
+ for (int i = 0; i < nullCount; ++i) {
+ list.add(RANDOM.nextInt(list.size()), null);
+ }
+
+ return list;
+ }
+
+ private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers() {
+ int length = RANDOM.nextInt(5) - 1;
+ if (length < 0) {
+ return null;
+ }
+ List<PhoneBookWriter.PhoneNumber> phoneNumbers = new ArrayList<>(length);
+ for (int i = 0; i < length; ++i) {
+ // 6 digits numbers
+ long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
+ phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+ }
+ return phoneNumbers;
+ }
+
+ private static PhoneBookWriter.Location generateLocation(int id, int rowCount) {
+ if (RANDOM.nextDouble() < 0.01) {
+ return null;
+ }
+
+ if (RANDOM.nextDouble() < 0.001) {
+ return new PhoneBookWriter.Location(99.9, 99.9);
+ }
+
+ double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
+ double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
+
+ return new PhoneBookWriter.Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
+ }
+
+ private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
+ boolean useBloomFilter) throws IOException {
+ /*
+ byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
+ FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+ .withFooterKey(keyBytes)
+ .build();
+ */
+
+ StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
+ kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
+ kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
+ kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
+
+ FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(kr1)
+ .build();
+
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(FilterCompat.get(filter))
+ .withDecryption(fileDecryptionProperties)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useBloomFilter(useBloomFilter)
+ .useColumnIndexFilter(useOtherFiltering));
+ }
+
+ // Assumes that both lists are in the same order
+ private static void assertContains(Stream<PhoneBookWriter.User> expected, List<PhoneBookWriter.User> actual) {
+ Iterator<PhoneBookWriter.User> expIt = expected.iterator();
+ if (!expIt.hasNext()) {
+ return;
+ }
+ PhoneBookWriter.User exp = expIt.next();
+ for (PhoneBookWriter.User act : actual) {
+ if (act.equals(exp)) {
+ if (!expIt.hasNext()) {
+ break;
+ }
+ exp = expIt.next();
+ }
+ }
+ assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
+ }
+
+ private void assertCorrectFiltering(Predicate<PhoneBookWriter.User> expectedFilter, FilterPredicate actualFilter)
+ throws IOException {
+ // Check with only bloom filter based filtering
+ List<PhoneBookWriter.User> result = readUsers(actualFilter, false, true);
+
+ assertTrue("Bloom filtering should drop some row groups", result.size() < DATA.size());
+ LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
+ 100 * result.size() / DATA.size());
+ // Asserts that all the required records are in the result
+ assertContains(DATA.stream().filter(expectedFilter), result);
+ // Asserts that all the retrieved records are in the file (validating non-matching records)
+ assertContains(result.stream(), DATA);
+
+ // Check with all the filtering filtering to ensure the result contains exactly the required values
+ result = readUsers(actualFilter, true, false);
+ assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
+ }
+
+
+ @BeforeClass
+ public static void createFile() throws IOException {
+ int pageSize = DATA.size() / 100; // Ensure that several pages will be created
+ int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created
+/*
+ byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
+ FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(keyBytes)
+ .build();
+ */
+ // Encryption configuration 2: Encrypt two columns and the footer, with different keys.
+ ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+ .builder("id")
+ .withKey(COLUMN_ENCRYPTION_KEY1)
+ .withKeyID("kc1")
+ .build();
+
+ ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+ .builder("name")
+ .withKey(COLUMN_ENCRYPTION_KEY2)
+ .withKeyID("kc2")
+ .build();
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+ FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyID("kf")
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+
+ PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withBloomFilterNDV("location.lat", 10000L)
+ .withBloomFilterNDV("name", 10000L)
+ .withBloomFilterNDV("id", 10000L)
+ .withEncryption(encryptionProperties)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
+ DATA);
+ PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withBloomFilterNDV("location.lat", 10000L)
+ .withBloomFilterNDV("name", 10000L)
+ .withBloomFilterNDV("id", 10000L)
+ .withEncryption(encryptionProperties)
+ .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
+ DATA);
+ }
+
+ @AfterClass
+ public static void deleteFile() throws IOException {
+ FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
+ FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+ }
+
+
+ @Test
+ public void testSimpleFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> record.getId() == 1234L,
+ eq(longColumn("id"), 1234L));
+
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ eq(binaryColumn("name"), Binary.fromString("miller")));
+ }
+
+ @Test
+ public void testNestedFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> {
+ PhoneBookWriter.Location location = record.getLocation();
+ return location != null && location.getLat() != null && location.getLat() == 99.9;
+ },
+ eq(doubleColumn("location.lat"), 99.9));
+ }
+}
+
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
new file mode 100644
index 0000000..2fd2207
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexEncryption.java
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.not;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
+import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.optional;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for high level column index based filtering.
+ */
+@RunWith(Parameterized.class)
+public class TestColumnIndexEncryption {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexEncryption.class);
+ private static final Random RANDOM = new Random(42);
+ private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
+ private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
+ private static final Path FILE_V1 = createTempFile();
+ private static final Path FILE_V2 = createTempFile();
+ private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
+ .required(INT64).named("id")
+ .optionalGroup()
+ .addField(optional(DOUBLE).named("lon"))
+ .addField(optional(DOUBLE).named("lat"))
+ .named("location")
+ .optionalGroup()
+ .repeatedGroup()
+ .addField(required(INT64).named("number"))
+ .addField(optional(BINARY).as(stringType()).named("kind"))
+ .named("phone")
+ .named("phoneNumbers")
+ .named("user_without_name");
+ private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
+ private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
+
+//Simple key retriever, based on UTF8 strings as key identifiers
+ static class StringKeyIdRetriever implements DecryptionKeyRetriever{
+
+ private final Hashtable<String,byte[]> keyMap = new Hashtable<String,byte[]>();
+
+ public void putKey(String keyId, byte[] keyBytes) {
+ keyMap.put(keyId, keyBytes);
+ }
+
+ @Override
+ public byte[] getKey(byte[] keyMetaData) {
+ String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+ return keyMap.get(keyId);
+ }
+ }
+
+
+ @Parameters
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
+ }
+
+ private final Path file;
+
+ public TestColumnIndexEncryption(Path file) {
+ this.file = file;
+ }
+
+ private static List<User> generateData(int rowCount) {
+ List<User> users = new ArrayList<>();
+ List<String> names = generateNames(rowCount);
+ for (int i = 0; i < rowCount; ++i) {
+ users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
+ }
+ return users;
+ }
+
+ private static List<String> generateNames(int rowCount) {
+ List<String> list = new ArrayList<>();
+
+ // Adding fix values for filtering
+ list.add("anderson");
+ list.add("anderson");
+ list.add("miller");
+ list.add("miller");
+ list.add("miller");
+ list.add("thomas");
+ list.add("thomas");
+ list.add("williams");
+
+ int nullCount = rowCount / 100;
+
+ String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
+ int maxLength = 8;
+ for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
+ int l = RANDOM.nextInt(maxLength);
+ StringBuilder builder = new StringBuilder(l);
+ for (int j = 0; j < l; ++j) {
+ builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
+ }
+ list.add(builder.toString());
+ }
+ Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
+
+ // Adding nulls to random places
+ for (int i = 0; i < nullCount; ++i) {
+ list.add(RANDOM.nextInt(list.size()), null);
+ }
+
+ return list;
+ }
+
+ private static List<PhoneNumber> generatePhoneNumbers() {
+ int length = RANDOM.nextInt(5) - 1;
+ if (length < 0) {
+ return null;
+ }
+ List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
+ for (int i = 0; i < length; ++i) {
+ // 6 digits numbers
+ long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
+ phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
+ }
+ return phoneNumbers;
+ }
+
+ private static Location generateLocation(int id, int rowCount) {
+ if (RANDOM.nextDouble() < 0.01) {
+ return null;
+ }
+
+ double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
+ double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
+
+ return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
+ }
+
+ private static Path createTempFile() {
+ try {
+ return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString());
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
+ return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
+ }
+
+ private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+ throws IOException {
+ return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
+ }
+
+ private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
+ return readUsers(filter, useOtherFiltering, true);
+ }
+
+ private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
+ throws IOException {
+
+ StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
+ kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
+ kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
+ kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
+
+ FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(kr1)
+ .build();
+
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(filter)
+ .withDecryption(fileDecryptionProperties)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useColumnIndexFilter(useColumnIndexFilter));
+ }
+
+ private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {
+ StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
+ kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
+ kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
+ kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
+
+ FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
+ .withKeyRetriever(kr1)
+ .build();
+
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(filter)
+ .withDecryption(fileDecryptionProperties)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useColumnIndexFilter(useColumnIndexFilter)
+ .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
+ }
+
+ // Assumes that both lists are in the same order
+ private static void assertContains(Stream<User> expected, List<User> actual) {
+ Iterator<User> expIt = expected.iterator();
+ if (!expIt.hasNext()) {
+ return;
+ }
+ User exp = expIt.next();
+ for (User act : actual) {
+ if (act.equals(exp)) {
+ if (!expIt.hasNext()) {
+ break;
+ }
+ exp = expIt.next();
+ }
+ }
+ assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
+ }
+
+ private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
+ throws IOException {
+ // Check with only column index based filtering
+ List<User> result = readUsers(actualFilter, false);
+
+ assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());
+ LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
+ 100 * result.size() / DATA.size());
+ // Asserts that all the required records are in the result
+ assertContains(DATA.stream().filter(expectedFilter), result);
+ // Asserts that all the retrieved records are in the file (validating non-matching records)
+ assertContains(result.stream(), DATA);
+
+ // Check with all the filtering filtering to ensure the result contains exactly the required values
+ result = readUsers(actualFilter, true);
+ assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
+ }
+
+ @BeforeClass
+ public static void createFile() throws IOException {
+ int pageSize = DATA.size() / 10; // Ensure that several pages will be created
+ int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
+
+ // Encryption configuration: Encrypt two columns and the footer, with different keys.
+ ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
+ .builder("id")
+ .withKey(COLUMN_ENCRYPTION_KEY1)
+ .withKeyID("kc1")
+ .build();
+
+ ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
+ .builder("name")
+ .withKey(COLUMN_ENCRYPTION_KEY2)
+ .withKeyID("kc2")
+ .build();
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
+
+ columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
+ columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
+
+ FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
+ .withFooterKeyID("kf")
+ .withEncryptedColumns(columnPropertiesMap)
+ .build();
+
+ PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withEncryption(encryptionProperties)
+ .withWriterVersion(WriterVersion.PARQUET_1_0),
+ DATA);
+
+ PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
+ .withWriteMode(OVERWRITE)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withEncryption(encryptionProperties)
+ .withWriterVersion(WriterVersion.PARQUET_2_0),
+ DATA);
+ }
+
+ @AfterClass
+ public static void deleteFile() throws IOException {
+ FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
+ FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
+ }
+
+ @Test
+ public void testSimpleFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> record.getId() == 1234,
+ eq(longColumn("id"), 1234l));
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ eq(binaryColumn("name"), Binary.fromString("miller")));
+ assertCorrectFiltering(
+ record -> record.getName() == null,
+ eq(binaryColumn("name"), null));
+ }
+
+ @Test
+ public void testNoFiltering() throws IOException {
+ // Column index filtering with no-op filter
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
+ assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
+
+ // Column index filtering turned off
+ assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
+ readUsers(eq(longColumn("id"), 1234l), true, false));
+ assertEquals(DATA.stream().filter(user -> "miller".equals(user.getName())).collect(Collectors.toList()),
+ readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
+ assertEquals(DATA.stream().filter(user -> user.getName() == null).collect(Collectors.toList()),
+ readUsers(eq(binaryColumn("name"), null), true, false));
+
+ // Every filtering mechanism turned off
+ assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
+ assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
+ assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
+ }
+
+ @Test
+ public void testComplexFiltering() throws IOException {
+ assertCorrectFiltering(
+ record -> {
+ Location loc = record.getLocation();
+ Double lat = loc == null ? null : loc.getLat();
+ Double lon = loc == null ? null : loc.getLon();
+ return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
+ },
+ and(and(gtEq(doubleColumn("location.lat"), 37.0), ltEq(doubleColumn("location.lat"), 70.0)),
+ and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
+ assertCorrectFiltering(
+ record -> {
+ Location loc = record.getLocation();
+ return loc == null || (loc.getLat() == null && loc.getLon() == null);
+ },
+ and(eq(doubleColumn("location.lat"), null), eq(doubleColumn("location.lon"), null)));
+ assertCorrectFiltering(
+ record -> {
+ String name = record.getName();
+ return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
+ },
+ and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
+ }
+
+ public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
+ private static final Binary A = Binary.fromString("a");
+ private static final Binary B = Binary.fromString("b");
+ private static final Binary E = Binary.fromString("e");
+ private static final Binary F = Binary.fromString("f");
+ private static final Binary I = Binary.fromString("i");
+ private static final Binary J = Binary.fromString("j");
+ private static final Binary O = Binary.fromString("o");
+ private static final Binary P = Binary.fromString("p");
+ private static final Binary U = Binary.fromString("u");
+ private static final Binary V = Binary.fromString("v");
+
+ private static boolean isStartingWithVowel(String str) {
+ if (str == null || str.isEmpty()) {
+ return false;
+ }
+ switch (str.charAt(0)) {
+ case 'a':
+ case 'e':
+ case 'i':
+ case 'o':
+ case 'u':
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean keep(Binary value) {
+ return value != null && isStartingWithVowel(value.toStringUsingUTF8());
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ Binary min = statistics.getMin();
+ Binary max = statistics.getMax();
+ return cmp.compare(max, A) < 0
+ || (cmp.compare(min, B) >= 0 && cmp.compare(max, E) < 0)
+ || (cmp.compare(min, F) >= 0 && cmp.compare(max, I) < 0)
+ || (cmp.compare(min, J) >= 0 && cmp.compare(max, O) < 0)
+ || (cmp.compare(min, P) >= 0 && cmp.compare(max, U) < 0)
+ || cmp.compare(min, V) >= 0;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Binary> statistics) {
+ Comparator<Binary> cmp = statistics.getComparator();
+ Binary min = statistics.getMin();
+ Binary max = statistics.getMax();
+ return (cmp.compare(min, A) >= 0 && cmp.compare(max, B) < 0)
+ || (cmp.compare(min, E) >= 0 && cmp.compare(max, F) < 0)
+ || (cmp.compare(min, I) >= 0 && cmp.compare(max, J) < 0)
+ || (cmp.compare(min, O) >= 0 && cmp.compare(max, P) < 0)
+ || (cmp.compare(min, U) >= 0 && cmp.compare(max, V) < 0);
+ }
+ }
+
+ public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
+ private long divisor;
+
+ IsDivisibleBy(long divisor) {
+ this.divisor = divisor;
+ }
+
+ @Override
+ public boolean keep(Long value) {
+ // Deliberately not checking for null to verify the handling of NPE
+ // Implementors shall always checks the value for null and return accordingly
+ return value % divisor == 0;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Long> statistics) {
+ long min = statistics.getMin();
+ long max = statistics.getMax();
+ return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Long> statistics) {
+ long min = statistics.getMin();
+ long max = statistics.getMax();
+ return min == max && min % divisor == 0;
+ }
+ }
+
+ @Test
+ public void testUDF() throws IOException {
+ assertCorrectFiltering(
+ record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
+ or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+ userDefined(longColumn("id"), new IsDivisibleBy(234))));
+ assertCorrectFiltering(
+ record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
+ not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
+ userDefined(longColumn("id"), new IsDivisibleBy(234)))));
+ }
+
+ @Test
+ public void testFilteringWithMissingColumns() throws IOException {
+ // Missing column filter is always true
+ assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
+ assertCorrectFiltering(
+ record -> record.getId() == 1234,
+ and(eq(longColumn("id"), 1234l),
+ eq(longColumn("not-existing-long"), null)));
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ and(eq(binaryColumn("name"), Binary.fromString("miller")),
+ invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
+
+ // Missing column filter is always false
+ assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
+ assertCorrectFiltering(
+ record -> "miller".equals(record.getName()),
+ or(eq(binaryColumn("name"), Binary.fromString("miller")),
+ gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
+ assertCorrectFiltering(
+ record -> record.getId() == 1234,
+ or(eq(longColumn("id"), 1234l),
+ userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
+ }
+
+ @Test
+ public void testFilteringWithProjection() throws IOException {
+ // All rows shall be retrieved because all values in column 'name' shall be handled as null values
+ assertEquals(
+ DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()),
+ readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
+
+ // Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
+ assertEquals(
+ emptyList(),
+ readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
+ assertEquals(
+ emptyList(),
+ readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
+ SCHEMA_WITHOUT_NAME, false, true));
+ }
+}
+