You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by sh...@apache.org on 2021/10/04 21:04:35 UTC
[parquet-mr] branch master updated: PARQUET-2081: Encryption
translation tool - Parquet-hadoop (#928)
This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 1adc228 PARQUET-2081: Encryption translation tool - Parquet-hadoop (#928)
1adc228 is described below
commit 1adc22804a700d78f8480667d083e91d6147339f
Author: Xinli Shang <sh...@uber.com>
AuthorDate: Mon Oct 4 14:04:27 2021 -0700
PARQUET-2081: Encryption translation tool - Parquet-hadoop (#928)
* PARQUET-2081: Encryption translation tool - Parquet-hadoop
Summary:
Design doc - High Throughput CLAC Writer: https://docs.google.com/document/d/1-XdE8-QyDHnBsYrClwNsR8X3ks0JmKJ1-rXq7_th0hc
Added unit tests
Integration tests with real data
* Address feedbacks
* Address more comments
* Revert the refactoring code to avoid execlusion in public api check
* Address more feedbbacks
* Refactor the code to have rewrite offset index always
* Rename methods to reflect the change better
* Use 'encrypt' flag to create different encrytion runtime
Add checking of encrypted column
* Address comments
* Address more comments
---
.../column/columnindex/OffsetIndexBuilder.java | 26 +-
.../parquet/crypto/ColumnEncryptionProperties.java | 11 +-
.../format/converter/ParquetMetadataConverter.java | 20 +-
.../apache/parquet/hadoop/ParquetFileReader.java | 2 +-
.../apache/parquet/hadoop/ParquetFileWriter.java | 79 ++++-
.../hadoop/metadata/ColumnChunkMetaData.java | 38 ++-
.../parquet/hadoop/util/ColumnEncryptor.java | 329 +++++++++++++++++++++
.../apache/parquet/hadoop/util/ColumnMasker.java | 4 +-
.../parquet/hadoop/util/H2SeekableInputStream.java | 1 +
.../parquet/hadoop/util/ColumnEncryptorTest.java | 293 ++++++++++++++++++
.../hadoop/util/CompressionConveterTest.java | 7 -
.../parquet/hadoop/util/EncDecProperties.java | 99 +++++++
.../apache/parquet/hadoop/util/TestFileHelper.java | 153 ++++++++++
13 files changed, 1016 insertions(+), 46 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
index 4909744..169c82e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndexBuilder.java
@@ -65,7 +65,7 @@ public class OffsetIndexBuilder {
public long getFirstRowIndex(int pageIndex) {
return firstRowIndexes[pageIndex];
}
-
+
@Override
public int getPageOrdinal(int pageIndex) {
return pageIndex;
@@ -151,22 +151,36 @@ public class OffsetIndexBuilder {
return build(0);
}
+ public OffsetIndexBuilder fromOffsetIndex(OffsetIndex offsetIndex) {
+ assert offsetIndex instanceof OffsetIndexImpl;
+ OffsetIndexImpl offsetIndexImpl = (OffsetIndexImpl) offsetIndex;
+ this.offsets.addAll(new LongArrayList(offsetIndexImpl.offsets));
+ this.compressedPageSizes.addAll(new IntArrayList(offsetIndexImpl.compressedPageSizes));
+ this.firstRowIndexes.addAll(new LongArrayList(offsetIndexImpl.firstRowIndexes));
+ this.previousOffset = 0;
+ this.previousPageSize = 0;
+ this.previousRowIndex = 0;
+ this.previousRowCount = 0;
+
+ return this;
+ }
+
/**
* Builds the offset index. Used by the writers to building up {@link OffsetIndex} objects to be
* written to the Parquet file.
*
- * @param firstPageOffset
- * the actual offset in the file to be used to translate all the collected offsets
+ * @param shift
+ * how much to be shifted away
* @return the newly created offset index or {@code null} if the {@link OffsetIndex} object would be empty
*/
- public OffsetIndex build(long firstPageOffset) {
+ public OffsetIndex build(long shift) {
if (compressedPageSizes.isEmpty()) {
return null;
}
long[] offsets = this.offsets.toLongArray();
- if (firstPageOffset != 0) {
+ if (shift != 0) {
for (int i = 0, n = offsets.length; i < n; ++i) {
- offsets[i] += firstPageOffset;
+ offsets[i] += shift;
}
}
OffsetIndexImpl offsetIndex = new OffsetIndexImpl();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
index 19419fc..fd89b8c 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnEncryptionProperties.java
@@ -86,7 +86,16 @@ public class ColumnEncryptionProperties {
return builder(path, true);
}
- static Builder builder(ColumnPath path, boolean encrypt) {
+ /**
+ * Builder for encrypted columns.
+ * To make sure column path is not misspelled or misplaced,
+ * file writer will verify this column is in file schema.
+ *
+ * @param path Column path
+ * @param encrypt whether or not this column to be encrypted
+ * @return Builder
+ */
+ public static Builder builder(ColumnPath path, boolean encrypt) {
return new Builder(path, encrypt);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 3c6e32c..96980a4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1791,14 +1791,14 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding valuesEncoding,
OutputStream to,
BlockCipher.Encryptor blockEncryptor,
- byte[] AAD) throws IOException {
+ byte[] pageHeaderAAD) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding),
- to, blockEncryptor, AAD);
+ to, blockEncryptor, pageHeaderAAD);
}
public void writeDataPageV1Header(
@@ -1824,7 +1824,7 @@ public class ParquetMetadataConverter {
int crc,
OutputStream to,
BlockCipher.Encryptor blockEncryptor,
- byte[] AAD) throws IOException {
+ byte[] pageHeaderAAD) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
@@ -1832,7 +1832,7 @@ public class ParquetMetadataConverter {
dlEncoding,
valuesEncoding,
crc),
- to, blockEncryptor, AAD);
+ to, blockEncryptor, pageHeaderAAD);
}
public void writeDataPageV2Header(
@@ -1852,13 +1852,13 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding dataEncoding,
int rlByteLength, int dlByteLength,
OutputStream to, BlockCipher.Encryptor blockEncryptor,
- byte[] AAD) throws IOException {
+ byte[] pageHeaderAAD) throws IOException {
writePageHeader(
newDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
- rlByteLength, dlByteLength), to, blockEncryptor, AAD);
+ rlByteLength, dlByteLength), to, blockEncryptor, pageHeaderAAD);
}
private PageHeader newDataPageV2Header(
@@ -1886,10 +1886,10 @@ public class ParquetMetadataConverter {
public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, OutputStream to,
- BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
+ BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
- writePageHeader(pageHeader, to, blockEncryptor, AAD);
+ writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
}
public void writeDictionaryPageHeader(
@@ -1902,11 +1902,11 @@ public class ParquetMetadataConverter {
public void writeDictionaryPageHeader(
int uncompressedSize, int compressedSize, int valueCount,
org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to,
- BlockCipher.Encryptor blockEncryptor, byte[] AAD) throws IOException {
+ BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setCrc(crc);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
- writePageHeader(pageHeader, to, blockEncryptor, AAD);
+ writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD);
}
private static BoundaryOrder toParquetBoundaryOrder(
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 3a68e01..63a22d1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -523,7 +523,7 @@ public class ParquetFileReader implements Closeable {
}
}
- private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
+ public static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
return readFooter(file, options, f, converter);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 2e5d55c..210d6c4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -394,7 +394,7 @@ public class ParquetFileWriter {
out.write(magic);
}
- InternalFileEncryptor getEncryptor() {
+ public InternalFileEncryptor getEncryptor() {
return fileEncryptor;
}
@@ -563,7 +563,7 @@ public class ParquetFileWriter {
// We are unable to build indexes without rowCount so skip them for this column
offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
- innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+ innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, null, null);
}
/**
@@ -579,16 +579,42 @@ public class ParquetFileWriter {
* @throws IOException if any I/O error occurs during writing the file
*/
public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ long rowCount,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rowCount, rlEncoding, dlEncoding, valuesEncoding, null, null);
+ }
+
+ /**
+ * Writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param statistics the statistics of the page
+ * @param rowCount the number of rows in the page
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ * @param metadataBlockEncryptor encryptor for block data
+ * @param pageHeaderAAD pageHeader AAD
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
long rowCount,
Encoding rlEncoding,
Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
+ Encoding valuesEncoding,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD) throws IOException {
long beforeHeader = out.getPos();
- innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
-
+ innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, metadataBlockEncryptor, pageHeaderAAD);
offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
}
@@ -598,7 +624,34 @@ public class ParquetFileWriter {
Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
+ Encoding valuesEncoding,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD) throws IOException {
+ writeDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding, metadataBlockEncryptor, pageHeaderAAD);
+ }
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param statistics statistics for the page
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ * @param metadataBlockEncryptor encryptor for block data
+ * @param pageHeaderAAD pageHeader AAD
+ * @throws IOException if there is an error while writing
+ */
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding,
+ BlockCipher.Encryptor metadataBlockEncryptor,
+ byte[] pageHeaderAAD) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (currentChunkFirstDataPage < 0) {
@@ -616,7 +669,9 @@ public class ParquetFileWriter {
dlEncoding,
valuesEncoding,
(int) crc.getValue(),
- out);
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
} else {
metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
@@ -624,7 +679,9 @@ public class ParquetFileWriter {
rlEncoding,
dlEncoding,
valuesEncoding,
- out);
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
}
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
@@ -1035,6 +1092,12 @@ public class ParquetFileWriter {
long length = chunk.getTotalSize();
long newChunkStart = out.getPos();
+ if (newChunkStart != start) {
+ offsetIndex = OffsetIndexBuilder.getBuilder()
+ .fromOffsetIndex(offsetIndex)
+ .build(newChunkStart - start);
+ }
+
copy(from, out, start, length);
currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index ed26618..f149b1b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -112,7 +112,7 @@ abstract public class ColumnChunkMetaData {
long valueCount,
long totalSize,
long totalUncompressedSize) {
-
+
return get(path, Types.optional(type).named("fake_type"), codec, encodingStats, encodings, statistics,
firstDataPage, dictionaryPageOffset, valueCount, totalSize, totalUncompressedSize);
}
@@ -157,14 +157,14 @@ abstract public class ColumnChunkMetaData {
totalUncompressedSize);
}
}
-
+
// In sensitive columns, the ColumnMetaData structure is encrypted (with column-specific keys), making the fields like Statistics invisible.
// Decryption is not performed pro-actively, due to performance and authorization reasons.
// This method creates an a shell ColumnChunkMetaData object that keeps the encrypted metadata and the decryption tools.
// These tools will activated later - when/if the column is projected.
- public static ColumnChunkMetaData getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path,
+ public static ColumnChunkMetaData getWithEncryptedMetadata(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path,
PrimitiveType type, byte[] encryptedMetadata, byte[] columnKeyMetadata,
- InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal,
+ InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal,
String createdBy) {
return new EncryptedColumnChunkMetaData(parquetMetadataConverter, path, type, encryptedMetadata, columnKeyMetadata,
fileDecryptor, rowGroupOrdinal, columnOrdinal, createdBy);
@@ -359,8 +359,8 @@ abstract public class ColumnChunkMetaData {
decryptIfNeeded();
return "ColumnMetaData{" + properties.toString() + ", " + getFirstDataPageOffset() + "}";
}
-
- public boolean hasDictionaryPage() {
+
+ public boolean hasDictionaryPage() {
EncodingStats stats = getEncodingStats();
if (stats != null) {
// ensure there is a dictionary page and that it is used to encode data pages
@@ -370,6 +370,14 @@ abstract public class ColumnChunkMetaData {
Set<Encoding> encodings = getEncodings();
return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
}
+
+ /**
+ * @return whether or not this column is encrypted
+ */
+ @Private
+ public boolean isEncrypted() {
+ return false;
+ }
}
class IntColumnChunkMetaData extends ColumnChunkMetaData {
@@ -567,7 +575,7 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
private final ParquetMetadataConverter parquetMetadataConverter;
private final byte[] encryptedMetadata;
private final byte[] columnKeyMetadata;
- private final InternalFileDecryptor fileDecryptor;
+ private final InternalFileDecryptor fileDecryptor;
private final int columnOrdinal;
private final PrimitiveType primitiveType;
@@ -577,7 +585,7 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
private boolean decrypted;
private ColumnChunkMetaData shadowColumnChunkMetaData;
- EncryptedColumnChunkMetaData(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, PrimitiveType type,
+ EncryptedColumnChunkMetaData(ParquetMetadataConverter parquetMetadataConverter, ColumnPath path, PrimitiveType type,
byte[] encryptedMetadata, byte[] columnKeyMetadata,
InternalFileDecryptor fileDecryptor, int rowGroupOrdinal, int columnOrdinal, String createdBy) {
super((EncodingStats) null, (ColumnChunkProperties) null);
@@ -603,12 +611,12 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
}
// Decrypt the ColumnMetaData
- InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(path, true, false,
+ InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.setColumnCryptoMetadata(path, true, false,
columnKeyMetadata, columnOrdinal);
-
+
ColumnMetaData metaData;
ByteArrayInputStream tempInputStream = new ByteArrayInputStream(encryptedMetadata);
- byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData,
+ byte[] columnMetaDataAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnMetaData,
rowGroupOrdinal, columnOrdinal, -1);
try {
metaData = readColumnMetaData(tempInputStream, columnDecryptionSetup.getMetaDataDecryptor(), columnMetaDataAAD);
@@ -664,4 +672,12 @@ class EncryptedColumnChunkMetaData extends ColumnChunkMetaData {
decryptIfNeeded();
return shadowColumnChunkMetaData.getStatistics();
}
+
+ /**
+ * @return whether or not this column is encrypted
+ */
+ @Override
+ public boolean isEncrypted() {
+ return true;
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
new file mode 100644
index 0000000..32ac49a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+/**
+ * This class is for fast rewriting existing file with column encryption
+ *
+ * For columns to be encrypted, all the pages of those columns are read, but decompression/decoding,
+ * it is encrypted immediately and write back.
+ *
+ * For columns not to be encrypted, the whole column chunk will be appended directly to writer.
+ */
+public class ColumnEncryptor {
+ private static class EncryptorRunTime {
+ private final InternalColumnEncryptionSetup colEncrSetup;
+ private final BlockCipher.Encryptor dataEncryptor;
+ private final BlockCipher.Encryptor metaDataEncryptor;
+ private final byte[] fileAAD ;
+
+ private byte[] dataPageHeaderAAD;
+ private byte[] dataPageAAD;
+ private byte[] dictPageHeaderAAD;
+ private byte[] dictPageAAD;
+
+ public EncryptorRunTime(InternalFileEncryptor fileEncryptor, ColumnChunkMetaData chunk,
+ int blockId, int columnId) throws IOException {
+ if (fileEncryptor == null) {
+ this.colEncrSetup = null;
+ this.dataEncryptor = null;
+ this.metaDataEncryptor = null;
+
+ this.fileAAD = null;
+ this.dataPageHeaderAAD = null;
+ this.dataPageAAD = null;
+ this.dictPageHeaderAAD = null;
+ this.dictPageAAD = null;
+ } else {
+ this.colEncrSetup = fileEncryptor.getColumnSetup(chunk.getPath(), true, columnId);
+ this.dataEncryptor = colEncrSetup.getDataEncryptor();
+ this.metaDataEncryptor = colEncrSetup.getMetaDataEncryptor();
+
+ this.fileAAD = fileEncryptor.getFileAAD();
+ this.dataPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DataPageHeader, blockId, columnId);
+ this.dataPageAAD = createAAD(colEncrSetup, ModuleType.DataPage, blockId, columnId);
+ this.dictPageHeaderAAD = createAAD(colEncrSetup, ModuleType.DictionaryPageHeader, blockId, columnId);
+ this.dictPageAAD = createAAD(colEncrSetup, ModuleType.DictionaryPage, blockId, columnId);
+ }
+ }
+
+ private byte[] createAAD(InternalColumnEncryptionSetup colEncrSetup, ModuleType moduleType, int blockId, int columnId) {
+ if (colEncrSetup != null && colEncrSetup.isEncrypted()) {
+ return AesCipher.createModuleAAD(fileAAD, moduleType, blockId, columnId, 0);
+ }
+ return null;
+ }
+
+ public BlockCipher.Encryptor getDataEncryptor() {
+ return this.dataEncryptor;
+ }
+
+ public BlockCipher.Encryptor getMetaDataEncryptor() {
+ return this.metaDataEncryptor;
+ }
+
+ public byte[] getDataPageHeaderAAD() {
+ return this.dataPageHeaderAAD;
+ }
+
+ public byte[] getDataPageAAD() {
+ return this.dataPageAAD;
+ }
+
+ public byte[] getDictPageHeaderAAD() {
+ return this.dictPageHeaderAAD;
+ }
+
+ public byte[] getDictPageAAD() {
+ return this.dictPageAAD;
+ }
+ }
+
+ private Configuration conf;
+
+ public ColumnEncryptor(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Given the input file, encrypt the columns specified by paths, and output the file.
+ * The encryption settings can be specified in the parameter of fileEncryptionProperties
+ * @param inputFile Input file
+ * @param outputFile Output file
+ * @param paths columns to be encrypted
+ * @param fileEncryptionProperties FileEncryptionProperties of the file
+ * @throws IOException
+ */
+ public void encryptColumns(String inputFile, String outputFile, List<String> paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+ Path inPath = new Path(inputFile);
+ Path outPath = new Path(outputFile);
+
+ ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
+ MessageType schema = metaData.getFileMetaData().getSchema();
+
+ ParquetFileWriter writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, ParquetFileWriter.Mode.OVERWRITE,
+ DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, fileEncryptionProperties);
+ writer.start();
+
+ try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) {
+ processBlocks(reader, writer, metaData, schema, paths);
+ }
+ writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+ }
+
+ private void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta,
+ MessageType schema, List<String> encryptPaths) throws IOException {
+ Set<ColumnPath> encryptColumnsPath = convertToColumnPaths(encryptPaths);
+ int blockId = 0;
+ PageReadStore store = reader.readNextRowGroup();
+
+ while (store != null) {
+ writer.startBlock(store.getRowCount());
+
+ List<ColumnChunkMetaData> columnsInOrder = meta.getBlocks().get(blockId).getColumns();
+ Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
+ Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+
+ for (int i = 0; i < columnsInOrder.size(); i += 1) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
+ // If a column is encrypted, we simply throw exception.
+ // Later we can add a feature to trans-encrypt it with different keys
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
+ }
+ ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
+ processChunk(descriptor, chunk, reader, writer, encryptColumnsPath, blockId, i, meta.getFileMetaData().getCreatedBy());
+ }
+
+ writer.endBlock();
+ store = reader.readNextRowGroup();
+ blockId++;
+ }
+ }
+
+ private void processChunk(ColumnDescriptor descriptor, ColumnChunkMetaData chunk, TransParquetFileReader reader, ParquetFileWriter writer,
+ Set<ColumnPath> encryptPaths, int blockId, int columnId, String createdBy) throws IOException {
+ reader.setStreamPosition(chunk.getStartingPos());
+ writer.startColumn(descriptor, chunk.getValueCount(), chunk.getCodec());
+ processPages(reader, chunk, writer, createdBy, blockId, columnId, encryptPaths.contains(chunk.getPath()));
+ writer.endColumn();
+ }
+
+ private void processPages(TransParquetFileReader reader, ColumnChunkMetaData chunk, ParquetFileWriter writer,
+ String createdBy, int blockId, int columnId, boolean encrypt) throws IOException {
+ int pageOrdinal = 0;
+ EncryptorRunTime encryptorRunTime = new EncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId);
+ DictionaryPage dictionaryPage = null;
+ long readValues = 0;
+ ParquetMetadataConverter converter = new ParquetMetadataConverter();
+ OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+ reader.setStreamPosition(chunk.getStartingPos());
+ long totalChunkValues = chunk.getValueCount();
+ while (readValues < totalChunkValues) {
+ PageHeader pageHeader = reader.readPageHeader();
+ byte[] pageLoad;
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ if (dictionaryPage != null) {
+ throw new IOException("has more than one dictionary page in column chunk");
+ }
+ //No quickUpdatePageAAD needed for dictionary page
+ DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
+ pageLoad = processPayload(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDictPageAAD(), encrypt);
+ writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
+ pageHeader.getUncompressed_page_size(),
+ dictPageHeader.getNum_values(),
+ converter.getEncoding(dictPageHeader.getEncoding())),
+ encryptorRunTime.getMetaDataEncryptor(), encryptorRunTime.getDictPageHeaderAAD());
+ break;
+ case DATA_PAGE:
+ if (encrypt) {
+ AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+ AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);
+ }
+ DataPageHeader headerV1 = pageHeader.data_page_header;
+ pageLoad = processPayload(reader, pageHeader.getCompressed_page_size(), encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD(), encrypt);
+ readValues += headerV1.getNum_values();
+ if (offsetIndex != null) {
+ long rowCount = 1 + offsetIndex.getLastRowIndex(pageOrdinal, totalChunkValues) - offsetIndex.getFirstRowIndex(pageOrdinal);
+ writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
+ pageHeader.getUncompressed_page_size(),
+ BytesInput.from(pageLoad),
+ converter.fromParquetStatistics(createdBy, headerV1.getStatistics(), chunk.getPrimitiveType()),
+ rowCount,
+ converter.getEncoding(headerV1.getRepetition_level_encoding()),
+ converter.getEncoding(headerV1.getDefinition_level_encoding()),
+ converter.getEncoding(headerV1.getEncoding()),
+ encryptorRunTime.getMetaDataEncryptor(),
+ encryptorRunTime.getDataPageHeaderAAD());
+ } else {
+ writer.writeDataPage(Math.toIntExact(headerV1.getNum_values()),
+ pageHeader.getUncompressed_page_size(),
+ BytesInput.from(pageLoad),
+ converter.fromParquetStatistics(createdBy, headerV1.getStatistics(), chunk.getPrimitiveType()),
+ converter.getEncoding(headerV1.getRepetition_level_encoding()),
+ converter.getEncoding(headerV1.getDefinition_level_encoding()),
+ converter.getEncoding(headerV1.getEncoding()),
+ encryptorRunTime.getMetaDataEncryptor(),
+ encryptorRunTime.getDataPageHeaderAAD());
+ }
+ pageOrdinal++;
+ break;
+ case DATA_PAGE_V2:
+ if (encrypt) {
+ AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageHeaderAAD(), pageOrdinal);
+ AesCipher.quickUpdatePageAAD(encryptorRunTime.getDataPageAAD(), pageOrdinal);
+ }
+ DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+ int rlLength = headerV2.getRepetition_levels_byte_length();
+ BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+ int dlLength = headerV2.getDefinition_levels_byte_length();
+ BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+ int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
+ int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
+ pageLoad = processPayload(reader, payLoadLength, encryptorRunTime.getDataEncryptor(), encryptorRunTime.getDataPageAAD(), encrypt);
+ readValues += headerV2.getNum_values();
+ writer.writeDataPageV2(headerV2.getNum_rows(),
+ headerV2.getNum_nulls(),
+ headerV2.getNum_values(),
+ rlLevels,
+ dlLevels,
+ converter.getEncoding(headerV2.getEncoding()),
+ BytesInput.from(pageLoad),
+ rawDataLength,
+ converter.fromParquetStatistics(createdBy, headerV2.getStatistics(), chunk.getPrimitiveType()));
+ pageOrdinal++;
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ private byte[] processPayload(TransParquetFileReader reader, int payloadLength, BlockCipher.Encryptor dataEncryptor,
+ byte[] AAD, boolean encrypt) throws IOException {
+ byte[] data = readBlock(payloadLength, reader);
+ if (!encrypt) {
+ return data;
+ }
+ return dataEncryptor.encrypt(data, AAD);
+ }
+
+ public byte[] readBlock(int length, TransParquetFileReader reader) throws IOException {
+ byte[] data = new byte[length];
+ reader.blockRead(data, 0, length);
+ return data;
+ }
+
+ public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
+ byte[] data = new byte[length];
+ reader.blockRead(data, 0, length);
+ return BytesInput.from(data, 0, length);
+ }
+
+ public static Set<ColumnPath> convertToColumnPaths(List<String> cols) {
+ Set<ColumnPath> prunePaths = new HashSet<>();
+ for (String col : cols) {
+ prunePaths.add(ColumnPath.fromDotString(col));
+ }
+ return prunePaths;
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
index 278e3a7..74bc534 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnMasker.java
@@ -123,7 +123,7 @@ public class ColumnMasker {
.build();
CodecFactory codecFactory = new CodecFactory(new Configuration(), props.getPageSizeThreshold());
CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(chunk.getCodec());
-
+
// Create new schema that only has the current column
MessageType newSchema = newSchema(schema, descriptor);
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength());
@@ -154,7 +154,7 @@ public class ColumnMasker {
cStore.close();
cWriter.close();
}
-
+
private MessageType newSchema(MessageType schema, ColumnDescriptor descriptor) {
String[] path = descriptor.getPath();
Type type = schema.getType(path);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index 0694558..2994ca8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -21,6 +21,7 @@ package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.io.DelegatingSeekableInputStream;
+
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java
new file mode 100644
index 0000000..670f6e2
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/ColumnEncryptorTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnEncryptorTest {
+
+ private Configuration conf = new Configuration();
+ private Map<String, String> extraMeta = ImmutableMap.of("key1", "value1", "key2", "value2");
+ private ColumnEncryptor columnEncryptor = null;
+ private final int numRecord = 100000;
+ private String inputFile = null;
+ private String outputFile = null;
+ private TestFileHelper.TestDocs testDocs = null;
+
+ private void testSetup(String compression) throws IOException {
+ columnEncryptor = new ColumnEncryptor(conf);
+ testDocs = new TestFileHelper.TestDocs(numRecord);
+ inputFile = TestFileHelper.createParquetFile(conf, extraMeta, numRecord, "input", compression,
+ ParquetProperties.WriterVersion.PARQUET_1_0, ParquetProperties.DEFAULT_PAGE_SIZE, testDocs);
+ outputFile = TestFileHelper.createTempFile("test");
+ }
+
+ @Test
+ public void testFlatColumn() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"DocId"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+
+ verifyResultDecryptionWithValidKey();
+ }
+
+ @Test
+ public void testNestedColumn() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"Links.Forward"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+ verifyResultDecryptionWithValidKey();
+ }
+
+ @Test
+ public void testNoEncryption() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+ verifyResultDecryptionWithValidKey();
+ }
+
+ @Test
+ public void testEncryptAllColumns() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"DocId", "Name", "Gender", "Links.Forward", "Links.Backward"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+ verifyResultDecryptionWithValidKey();
+ }
+
+ @Test
+ public void testEncryptSomeColumns() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"DocId", "Name", "Links.Forward"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+
+ ParquetMetadata metaData = getParquetMetadata(EncDecProperties.getFileDecryptionProperties());
+ assertTrue(metaData.getBlocks().size() > 0);
+ List<ColumnChunkMetaData> columns = metaData.getBlocks().get(0).getColumns();
+ Set<String> set = new HashSet<>(Arrays.asList(encryptColumns));
+ for (ColumnChunkMetaData column : columns) {
+ if (set.contains(column.getPath().toDotString())) {
+ assertTrue(column.isEncrypted());
+ } else {
+ assertFalse(column.isEncrypted());
+ }
+ }
+ }
+
+ @Test
+ public void testFooterEncryption() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"DocId"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, true));
+
+ verifyResultDecryptionWithValidKey();
+ }
+
+ @Test
+ public void testAesGcm() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"DocId"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_V1, true));
+
+ verifyResultDecryptionWithValidKey();
+ }
+
+ @Test
+ public void testColumnIndex() throws IOException {
+ testSetup("GZIP");
+ String[] encryptColumns = {"Name"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_V1, false));
+
+ verifyResultDecryptionWithValidKey();
+ verifyOffsetIndexes();
+ }
+
+ @Test
+ public void testDifferentCompression() throws IOException {
+ String[] compressions = {"GZIP", "ZSTD", "SNAPPY", "UNCOMPRESSED"};
+ for (String compression : compressions) {
+ testSetup(compression);
+ }
+ String[] encryptColumns = {"Links.Forward"};
+ columnEncryptor.encryptColumns(inputFile, outputFile, Arrays.asList(encryptColumns),
+ EncDecProperties.getFileEncryptionProperties(encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false));
+ verifyResultDecryptionWithValidKey();
+
+ }
+
+ private void verifyResultDecryptionWithValidKey() throws IOException {
+ ParquetReader<Group> reader = createReader(outputFile);
+ for (int i = 0; i < numRecord; i++) {
+ Group group = reader.read();
+ assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]);
+ assertArrayEquals(group.getBinary("Name", 0).getBytes(), testDocs.name[i].getBytes());
+ assertArrayEquals(group.getBinary("Gender", 0).getBytes(), testDocs.gender[i].getBytes());
+ Group subGroup = group.getGroup("Links", 0);
+ assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), testDocs.linkForward[i].getBytes());
+ assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), testDocs.linkBackward[i].getBytes());
+ }
+ reader.close();
+ }
+
+ private void verifyOffsetIndexes() throws IOException {
+ ParquetReadOptions readOptions = HadoopReadOptions.builder(conf)
+ .withDecryption(EncDecProperties.getFileDecryptionProperties())
+ .build();
+
+ try (TransParquetFileReader inReader = createFileReader(inputFile);
+ TransParquetFileReader outReader = createFileReader(outputFile)) {
+ ParquetMetadata inMetaData = getMetadata(readOptions, inputFile, inReader);
+ ParquetMetadata outMetaData = getMetadata(readOptions, outputFile, outReader);
+ compareOffsetIndexes(inReader, outReader, inMetaData, outMetaData);
+ }
+ }
+
+ private ParquetMetadata getMetadata(ParquetReadOptions readOptions, String file, TransParquetFileReader reader) throws IOException {
+ return ParquetFileReader.readFooter(HadoopInputFile.fromPath(new Path(file), conf),
+ readOptions,
+ reader.getStream());
+ }
+
+ private void compareOffsetIndexes(TransParquetFileReader inReader, TransParquetFileReader outReader,
+ ParquetMetadata inMetaData, ParquetMetadata outMetaData) throws IOException {
+
+ PageReadStore inStore = inReader.readNextRowGroup();
+ PageReadStore outStore = outReader.readNextRowGroup();
+ int blockIndex = 0;
+ while (inStore != null && outStore != null) {
+ List<ColumnChunkMetaData> inColumns = inMetaData.getBlocks().get(blockIndex).getColumns();
+ List<ColumnChunkMetaData> outColumns = outMetaData.getBlocks().get(blockIndex).getColumns();
+ assertEquals(inColumns.size(), outColumns.size());
+ validateColumns(inReader, outReader, inColumns, outColumns);
+ inStore = inReader.readNextRowGroup();
+ outStore = outReader.readNextRowGroup();
+ blockIndex++;
+ if (inStore != null || outStore != null) {
+ throw new IOException("Number of row groups are not equal");
+ }
+ }
+ }
+
+ private void validateColumns(TransParquetFileReader inReader, TransParquetFileReader outReader,
+ List<ColumnChunkMetaData> inColumns, List<ColumnChunkMetaData> outColumns) throws IOException {
+ for (int i = 0; i < inColumns.size(); i ++) {
+ ColumnChunkMetaData inChunk = inColumns.get(i);
+ ColumnChunkMetaData outChunk = outColumns.get(i);
+ OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+ OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+ assertEquals(inOffsetIndex.getPageCount(), outOffsetIndex.getPageCount());
+ if (outChunk.isEncrypted()) {
+ continue;
+ }
+ validatePages(inReader, outReader, inOffsetIndex, outOffsetIndex);
+ }
+ }
+
+ private void validatePages(TransParquetFileReader inReader, TransParquetFileReader outReader,
+ OffsetIndex inOffsetIndex, OffsetIndex outOffsetIndex) throws IOException {
+ for (int pageId = 0; pageId < inOffsetIndex.getPageCount(); pageId ++) {
+ long inPageOffset = inOffsetIndex.getOffset(pageId);
+ inReader.setStreamPosition(inPageOffset);
+ long outPageOffset = outOffsetIndex.getOffset(pageId);
+ outReader.setStreamPosition(outPageOffset);
+ PageHeader inPageHeader = inReader.readPageHeader();
+ PageHeader outPageHeader = outReader.readPageHeader();
+ assertEquals(inPageHeader, outPageHeader);
+ DataPageHeader inHeaderV1 = inPageHeader.data_page_header;
+ DataPageHeader outHeaderV1 = outPageHeader.data_page_header;
+ assertEquals(inHeaderV1, outHeaderV1);
+ BytesInput inPageLoad = readBlockAllocate(inReader, inPageHeader.compressed_page_size);
+ BytesInput outPageLoad = readBlockAllocate(outReader, outPageHeader.compressed_page_size);
+ assertEquals(inPageLoad.toByteBuffer(), outPageLoad.toByteBuffer());
+ }
+ }
+
+ private BytesInput readBlockAllocate(TransParquetFileReader reader, int length) throws IOException {
+ byte[] data = new byte[length];
+ reader.blockRead(data, 0, length);
+ return BytesInput.from(data, 0, length);
+ }
+
+ private TransParquetFileReader createFileReader(String path) throws IOException {
+ return new TransParquetFileReader(HadoopInputFile.fromPath(new Path(path), conf),
+ HadoopReadOptions.builder(conf)
+ .withDecryption(EncDecProperties.getFileDecryptionProperties())
+ .build());
+ }
+
+ private ParquetReader<Group> createReader(String path) throws IOException {
+ return ParquetReader.builder(new GroupReadSupport(),
+ new Path(path)).withConf(conf).withDecryption(EncDecProperties.getFileDecryptionProperties()).build();
+ }
+
+ private ParquetMetadata getParquetMetadata(FileDecryptionProperties decryptionProperties) throws IOException {
+ ParquetMetadata metaData;
+ ParquetReadOptions readOptions = ParquetReadOptions.builder()
+ .withDecryption(decryptionProperties)
+ .build();
+ InputFile file = HadoopInputFile.fromPath(new Path(outputFile), conf);
+ try (SeekableInputStream in = file.newStream()) {
+ metaData = ParquetFileReader.readFooter(file, readOptions, in);
+ }
+ return metaData;
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
index fefa5e4..218abb9 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
@@ -22,15 +22,11 @@ import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
-import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
-import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -46,9 +42,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
-import org.apache.parquet.io.ColumnIOFactory;
-import org.apache.parquet.io.MessageColumnIO;
-import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
new file mode 100644
index 0000000..ac877a2
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/EncDecProperties.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class EncDecProperties {
+
+ public static class DecryptionKeyRetrieverMock implements DecryptionKeyRetriever {
+ private final Map<String, byte[]> keyMap = new HashMap<>();
+
+ public DecryptionKeyRetrieverMock putKey(String keyId, byte[] keyBytes) {
+ keyMap.put(keyId, keyBytes);
+ return this;
+ }
+
+ @Override
+ public byte[] getKey(byte[] keyMetaData) {
+ String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+ return keyMap.get(keyId);
+ }
+ }
+
+ private static final byte[] FOOTER_KEY = {0x01, 0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a,
+ 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10};
+ private static final byte[] FOOTER_KEY_METADATA = "footkey".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] COL_KEY = {0x02, 0x03, 0x4, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b,
+ 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11};
+ private static final byte[] COL_KEY_METADATA = "col".getBytes(StandardCharsets.UTF_8);
+
+ public static FileDecryptionProperties getFileDecryptionProperties() throws IOException {
+ DecryptionKeyRetrieverMock keyRetriever = new DecryptionKeyRetrieverMock();
+ keyRetriever.putKey("footkey", FOOTER_KEY);
+ keyRetriever.putKey("col", COL_KEY);
+ return FileDecryptionProperties.builder().withPlaintextFilesAllowed().withKeyRetriever(keyRetriever).build();
+ }
+
+ public static FileEncryptionProperties getFileEncryptionProperties(String[] encrCols, ParquetCipher cipher, Boolean encryptFooter) {
+ if (encrCols.length == 0) {
+ return null;
+ }
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new HashMap<>();
+ Set<String> paths = new HashSet<>(Arrays.asList(encrCols));
+
+ for (ColumnPath columnPath : TestFileHelper.getPaths()) {
+ ColumnEncryptionProperties colEncProp;
+ if (paths.contains(columnPath.toDotString())) {
+ colEncProp = ColumnEncryptionProperties.builder(columnPath, true)
+ .withKey(COL_KEY)
+ .withKeyMetaData(COL_KEY_METADATA)
+ .build();
+ } else {
+ colEncProp = ColumnEncryptionProperties.builder(columnPath, false).build();
+ }
+ columnPropertyMap.put(columnPath, colEncProp);
+ }
+
+ FileEncryptionProperties.Builder encryptionPropertiesBuilder =
+ FileEncryptionProperties.builder(FOOTER_KEY)
+ .withFooterKeyMetadata(FOOTER_KEY_METADATA)
+ .withAlgorithm(cipher)
+ .withEncryptedColumns(columnPropertyMap);
+
+ if(!encryptFooter) {
+ encryptionPropertiesBuilder.withPlaintextFooter();
+ }
+
+ return encryptionPropertiesBuilder.build();
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileHelper.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileHelper.java
new file mode 100644
index 0000000..f661035
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestFileHelper.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+public class TestFileHelper {
+
+ public static MessageType schema = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+ private static Random rnd = new Random(5);
+
+ public static String createParquetFile(Configuration conf, Map<String, String> extraMeta, int numRecord, String prefix, String codec,
+ ParquetProperties.WriterVersion writerVersion, int pageSize, TestDocs testDocs) throws IOException {
+ conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+ String file = createTempFile(prefix);
+ ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new Path(file))
+ .withConf(conf)
+ .withWriterVersion(writerVersion)
+ .withExtraMetaData(extraMeta)
+ .withValidation(true)
+ .withPageSize(pageSize)
+ .withCompressionCodec(CompressionCodecName.valueOf(codec));
+ try (ParquetWriter writer = builder.build()) {
+ for (int i = 0; i < numRecord; i++) {
+ SimpleGroup g = new SimpleGroup(schema);
+ g.add("DocId", testDocs.docId[i]);
+ g.add("Name", testDocs.name[i]);
+ g.add("Gender", testDocs.gender[i]);
+ Group links = g.addGroup("Links");
+ links.add(0, testDocs.linkBackward[i]);
+ links.add(1, testDocs.linkForward[i]);
+ writer.write(g);
+ }
+ }
+ return file;
+ }
+
+ private static long getLong() {
+ return ThreadLocalRandom.current().nextLong(1000);
+ }
+
+ private static String getString() {
+ char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 100; i++) {
+ sb.append(chars[rnd.nextInt(10)]);
+ }
+ return sb.toString();
+ }
+
+ public static String createTempFile(String prefix) {
+ try {
+ return Files.createTempDirectory(prefix).toAbsolutePath().toString() + "/test.parquet";
+ } catch (IOException e) {
+ throw new AssertionError("Unable to create temporary file", e);
+ }
+ }
+
+ public static List<ColumnPath> getPaths() {
+ List<ColumnPath> paths = new ArrayList<>();
+ paths.add(ColumnPath.fromDotString("DocId"));
+ paths.add(ColumnPath.fromDotString("Name"));
+ paths.add(ColumnPath.fromDotString("Gender"));
+ paths.add(ColumnPath.fromDotString("Links.Backward"));
+ paths.add(ColumnPath.fromDotString("Links.Forward"));
+ return paths;
+ }
+
+ public static class TestDocs {
+ public long[] docId;
+ public String[] name;
+ public String[] gender;
+ public String[] linkBackward;
+ public String[] linkForward;
+
+ public TestDocs(int numRecord) {
+ docId = new long[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ docId[i] = getLong();
+ }
+
+ name = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ name[i] = getString();
+ }
+
+ gender = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ gender[i] = getString();
+ }
+
+ linkBackward = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ linkBackward[i] = getString();
+ }
+
+ linkForward = new String[numRecord];
+ for (int i = 0; i < numRecord; i++) {
+ linkForward[i] = getString();
+ }
+ }
+ }
+}