You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2021/04/23 09:58:44 UTC
[drill] branch master updated: DRILL-7825: Unknown logical type
in Parquet (#2143)
This is an automated email from the ASF dual-hosted git repository.
vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 5d21637 DRILL-7825: Unknown logical type <LogicalType UUID:UUIDType()> in Parquet (#2143)
5d21637 is described below
commit 5d216375271fe98b3733549bd85b93248d31075e
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Fri Apr 23 12:58:33 2021 +0300
DRILL-7825: Unknown logical type <LogicalType UUID:UUIDType()> in Parquet (#2143)
* DRILL-7825: Error: SYSTEM ERROR: RuntimeException: Unknown logical type <LogicalType UUID:UUIDType()>
---
exec/java-exec/pom.xml | 4 -
.../exec/store/parquet/ParquetRecordWriter.java | 14 +-
.../parquet/columnreaders/ColumnReaderFactory.java | 4 +-
.../store/parquet2/DrillParquetGroupConverter.java | 27 +-
.../hadoop/ParquetColumnChunkPageWriteStore.java | 419 +++--
.../apache/parquet/hadoop/ParquetFileWriter.java | 1634 ++++++++++++++++++++
.../parquet/ParquetSimpleTestFileGenerator.java | 61 +-
.../exec/store/parquet/TestParquetComplex.java | 14 +
.../store/parquet/TestParquetLogicalTypes.java | 31 +
.../parquet/parquet_test_file_simple.parquet | Bin 0 -> 5048 bytes
.../parquet/uuid-simple-fixed-length-array.parquet | Bin 0 -> 1328 bytes
.../resources/store/parquet/complex/uuid.parquet | Bin 0 -> 38974 bytes
exec/jdbc-all/pom.xml | 35 +-
pom.xml | 2 +-
14 files changed, 2039 insertions(+), 206 deletions(-)
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index f30700c..a08e172 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -259,10 +259,6 @@
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
- <artifactId>parquet-format</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${parquet.version}</version>
</dependency>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 4fd5064..45a2c7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -57,7 +57,6 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
@@ -251,10 +250,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
// We don't want this number to be too small either. Ideally, slightly bigger than the page size,
// but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
- // TODO: Use initialSlabSize from ParquetProperties once drill will be updated to the latest version of Parquet library
- int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(64, pageSize, 10);
- // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
- // once PARQUET-1006 will be resolved
ParquetProperties parquetProperties = ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryEncoding(enableDictionary)
@@ -263,10 +258,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
.withAllocator(new ParquetDirectByteBufferAllocator(oContext))
.withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
.build();
- pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
- pageSize, parquetProperties.getAllocator(), parquetProperties.getPageWriteChecksumEnabled(),
- parquetProperties.getColumnIndexTruncateLength()
- );
+ // TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
+ // once DRILL-7906 (PARQUET-1006) will be resolved
+ pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema,
+ parquetProperties.getInitialSlabSize(), pageSize, parquetProperties.getAllocator(),
+ parquetProperties.getColumnIndexTruncateLength(), parquetProperties.getPageWriteChecksumEnabled());
store = new ColumnWriteStoreV1(pageStore, parquetProperties);
MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index fcb61f6..25c9904 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -201,13 +201,15 @@ public class ColumnReaderFactory {
} else if (convertedType == ConvertedType.INTERVAL) {
return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement);
+ } else {
+ return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(recordReader, descriptor,
+ columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
}
} else {
return getNullableColumnReader(recordReader, descriptor,
columnChunkMetaData, fixedLength, v, schemaElement);
}
}
- throw new Exception("Unexpected parquet metadata configuration.");
}
static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index fbe3ae3..fdb6e79 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -71,6 +72,7 @@ import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
@@ -328,23 +330,30 @@ public class DrillParquetGroupConverter extends GroupConverter {
}
}
case FIXED_LEN_BYTE_ARRAY:
- switch (type.getOriginalType()) {
- case DECIMAL: {
+ LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter> typeAnnotationVisitor = new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveConverter>() {
+ @Override
+ public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
ParquetReaderUtility.checkDecimalTypeEnabled(options);
- return getVarDecimalConverter(name, type);
+ return Optional.of(getVarDecimalConverter(name, type));
}
- case INTERVAL: {
+
+ @Override
+ public Optional<PrimitiveConverter> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
IntervalWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).interval(), l -> l.list().interval())
- : getWriter(name, (m, f) -> m.interval(f), l -> l.interval());
- return new DrillFixedLengthByteArrayToInterval(writer);
+ : getWriter(name, MapWriter::interval, ListWriter::interval);
+ return Optional.of(new DrillFixedLengthByteArrayToInterval(writer));
}
- default: {
+ };
+
+ LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation != null) {
+ return logicalTypeAnnotation.accept(typeAnnotationVisitor).orElseGet(() -> {
VarBinaryWriter writer = type.isRepetition(Repetition.REPEATED)
? getWriter(name, (m, f) -> m.list(f).varBinary(), l -> l.list().varBinary())
- : getWriter(name, (m, f) -> m.varBinary(f), l -> l.varBinary());
+ : getWriter(name, MapWriter::varBinary, ListWriter::varBinary);
return new DrillFixedBinaryToVarbinaryConverter(writer, type.getTypeLength(), mutator.getManagedBuffer());
- }
+ });
}
default:
throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName());
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index b8f707d..790e3c3 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -31,77 +31,43 @@ import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
- * Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
- * It will be no need in this class once PARQUET-1006 is resolved.
- */
-public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
-
- private static final Logger logger = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class);
+@InterfaceAudience.Private
+public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class);
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<>();
- private final MessageType schema;
-
- public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
- MessageType schema,
- int initialSlabSize,
- int maxCapacityHint,
- ByteBufferAllocator allocator,
- boolean pageWriteChecksumEnabled,
- int columnIndexTruncateLength) {
- this.schema = schema;
- for (ColumnDescriptor path : schema.getColumns()) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize,
- maxCapacityHint, allocator, pageWriteChecksumEnabled, columnIndexTruncateLength));
- }
- }
-
- @Override
- public PageWriter getPageWriter(ColumnDescriptor path) {
- return writers.get(path);
- }
-
- /**
- * Writes the column chunks in the corresponding row group
- * @param writer the parquet file writer
- * @throws IOException if the file can not be created
- */
- public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- for (ColumnDescriptor path : schema.getColumns()) {
- ColumnChunkPageWriter pageWriter = writers.get(path);
- pageWriter.writeToFileWriter(writer);
- }
- }
-
- @Override
- public void close() {
- for (ColumnChunkPageWriter pageWriter : writers.values()) {
- pageWriter.close();
- }
- }
-
- private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
+ private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter, Closeable {
private final ColumnDescriptor path;
private final BytesCompressor compressor;
+ private final CapacityByteArrayOutputStream tempOutputStream;
private final CapacityByteArrayOutputStream buf;
private DictionaryPage dictionaryPage;
@@ -111,37 +77,74 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
private int pageCount;
// repetition and definition level encodings are used only for v1 pages and don't change
- private Set<Encoding> rlEncodings = new HashSet<>();
- private Set<Encoding> dlEncodings = new HashSet<>();
- private List<Encoding> dataEncodings = new ArrayList<>();
+ private Set<Encoding> rlEncodings = new HashSet<Encoding>();
+ private Set<Encoding> dlEncodings = new HashSet<Encoding>();
+ private List<Encoding> dataEncodings = new ArrayList<Encoding>();
+ private BloomFilter bloomFilter;
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
private Statistics totalStatistics;
+ private final ByteBufferAllocator allocator;
private final CRC32 crc;
boolean pageWriteChecksumEnabled;
+ private final BlockCipher.Encryptor headerBlockEncryptor;
+ private final BlockCipher.Encryptor pageBlockEncryptor;
+ private final int rowGroupOrdinal;
+ private final int columnOrdinal;
+ private int pageOrdinal;
+ private final byte[] dataPageAAD;
+ private final byte[] dataPageHeaderAAD;
+ private final byte[] fileAAD;
+
private ColumnChunkPageWriter(ColumnDescriptor path,
BytesCompressor compressor,
int initialSlabSize,
int maxCapacityHint,
ByteBufferAllocator allocator,
+ int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled,
- int columnIndexTruncateLength) {
+ BlockCipher.Encryptor headerBlockEncryptor,
+ BlockCipher.Encryptor pageBlockEncryptor,
+ byte[] fileAAD,
+ int rowGroupOrdinal,
+ int columnOrdinal) {
this.path = path;
this.compressor = compressor;
+ this.allocator = allocator;
+ this.tempOutputStream = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
- this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+ this.headerBlockEncryptor = headerBlockEncryptor;
+ this.pageBlockEncryptor = pageBlockEncryptor;
+ this.fileAAD = fileAAD;
+ this.rowGroupOrdinal = rowGroupOrdinal;
+ this.columnOrdinal = columnOrdinal;
+ this.pageOrdinal = -1;
+ if (null != headerBlockEncryptor) {
+ dataPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPageHeader,
+ rowGroupOrdinal, columnOrdinal, 0);
+ } else {
+ dataPageHeaderAAD = null;
+ }
+ if (null != pageBlockEncryptor) {
+ dataPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DataPage,
+ rowGroupOrdinal, columnOrdinal, 0);
+ } else {
+ dataPageAAD = null;
+ }
}
@Override
+ @Deprecated
public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding,
- Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+ Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
// Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
@@ -150,49 +153,66 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
}
@Override
- public void writePage(BytesInput bytes, int valueCount, int rowCount, Statistics statistics,
- Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
+ public void writePage(BytesInput bytes,
+ int valueCount,
+ int rowCount,
+ Statistics statistics,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ pageOrdinal++;
long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
- "Cannot write page larger than Integer.MAX_VALUE bytes: " +
- uncompressedSize);
+ "Cannot write page larger than Integer.MAX_VALUE bytes: " +
+ uncompressedSize);
}
-
BytesInput compressedBytes = compressor.compress(bytes);
+ if (null != pageBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+ compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dataPageAAD));
+ }
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
- "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
- + compressedSize);
+ "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+ + compressedSize);
+ }
+ tempOutputStream.reset();
+ if (null != headerBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
}
-
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(compressedBytes.toByteArray());
- parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize,
- valueCount, rlEncoding, dlEncoding, valuesEncoding, (int) crc.getValue(), buf);
+ parquetMetadataConverter.writeDataPageV1Header(
+ (int)uncompressedSize,
+ (int)compressedSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ (int) crc.getValue(),
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
} else {
- parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize,
- valueCount, rlEncoding, dlEncoding, valuesEncoding, buf);
+ parquetMetadataConverter.writeDataPageV1Header(
+ (int)uncompressedSize,
+ (int)compressedSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
}
-
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
- addStatistics(statistics);
-
- offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount);
-
- compressedBytes.writeAllTo(buf);
- rlEncodings.add(rlEncoding);
- dlEncodings.add(dlEncoding);
- dataEncodings.add(valuesEncoding);
- }
-
- private void addStatistics(Statistics statistics) {
// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
@@ -201,55 +221,81 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
}
columnIndexBuilder.add(statistics);
+ offsetIndexBuilder.add(toIntWithCheck(tempOutputStream.size() + compressedSize), rowCount);
+
+ // by concatenating before writing instead of writing twice,
+ // we only allocate one buffer to copy into instead of multiple.
+ BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes).writeAllTo(buf); // is used instead of above
+ rlEncodings.add(rlEncoding);
+ dlEncodings.add(dlEncoding);
+ dataEncodings.add(valuesEncoding);
}
@Override
- public void writePageV2(int rowCount,
- int nullCount,
- int valueCount,
- BytesInput repetitionLevels,
- BytesInput definitionLevels,
- Encoding dataEncoding,
- BytesInput data,
- Statistics<?> statistics) throws IOException {
+ public void writePageV2(
+ int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels, BytesInput definitionLevels,
+ Encoding dataEncoding, BytesInput data,
+ Statistics<?> statistics) throws IOException {
+ pageOrdinal++;
+
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(
- data.size() + repetitionLevels.size() + definitionLevels.size()
+ data.size() + repetitionLevels.size() + definitionLevels.size()
);
+ // TODO: decide if we compress
BytesInput compressedData = compressor.compress(data);
+ if (null != pageBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal);
+ compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD));
+ }
int compressedSize = toIntWithCheck(
- compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+ compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
+ tempOutputStream.reset();
+ if (null != headerBlockEncryptor) {
+ AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+ }
parquetMetadataConverter.writeDataPageV2Header(
- uncompressedSize, compressedSize,
- valueCount, nullCount, rowCount,
- statistics,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- buf);
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ tempOutputStream,
+ headerBlockEncryptor,
+ dataPageHeaderAAD);
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
- addStatistics(statistics);
-
- offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount);
-
- repetitionLevels.writeAllTo(buf);
- definitionLevels.writeAllTo(buf);
- compressedData.writeAllTo(buf);
+ // Copying the statistics if it is not initialized yet so we have the correct typed one
+ if (totalStatistics == null) {
+ totalStatistics = statistics.copy();
+ } else {
+ totalStatistics.mergeStatistics(statistics);
+ }
+ columnIndexBuilder.add(statistics);
+ offsetIndexBuilder.add(toIntWithCheck((long) tempOutputStream.size() + compressedSize), rowCount);
+
+ // by concatenating before writing instead of writing twice,
+ // we only allocate one buffer to copy into instead of multiple.
+ BytesInput.concat(
+ BytesInput.from(tempOutputStream),
+ repetitionLevels,
+ definitionLevels,
+ compressedData).writeAllTo(buf);
dataEncodings.add(dataEncoding);
}
private int toIntWithCheck(long size) {
if (size > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
- "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
- size);
+ "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+ size);
}
return (int)size;
}
@@ -259,35 +305,64 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
return buf.size();
}
- /**
- * Writes a number of pages within corresponding column chunk
- * @param writer the parquet file writer
- * @throws IOException if the file can not be created
- */
public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
- writer.writeColumnChunk(path, totalValueCount, compressor.getCodecName(),
- dictionaryPage, BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics,
- columnIndexBuilder, offsetIndexBuilder, rlEncodings, dlEncodings, dataEncodings);
- if (logger.isDebugEnabled()) {
- logger.debug(
- String.format(
- "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
- buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<>(dataEncodings))
- + (dictionaryPage != null ? String.format(
- ", dic { %,d entries, %,dB raw, %,dB comp}",
- dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
- : "")
- );
+ if (null == headerBlockEncryptor) {
+ writer.writeColumnChunk(
+ path,
+ totalValueCount,
+ compressor.getCodecName(),
+ dictionaryPage,
+ BytesInput.from(buf),
+ uncompressedLength,
+ compressedLength,
+ totalStatistics,
+ columnIndexBuilder,
+ offsetIndexBuilder,
+ bloomFilter,
+ rlEncodings,
+ dlEncodings,
+ dataEncodings);
+ } else {
+ writer.writeColumnChunk(
+ path,
+ totalValueCount,
+ compressor.getCodecName(),
+ dictionaryPage,
+ BytesInput.from(buf),
+ uncompressedLength,
+ compressedLength,
+ totalStatistics,
+ columnIndexBuilder,
+ offsetIndexBuilder,
+ bloomFilter,
+ rlEncodings,
+ dlEncodings,
+ dataEncodings,
+ headerBlockEncryptor,
+ rowGroupOrdinal,
+ columnOrdinal,
+ fileAAD);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ String.format(
+ "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+ buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<Encoding>(dataEncodings))
+ + (dictionaryPage != null ? String.format(
+ ", dic { %,d entries, %,dB raw, %,dB comp}",
+ dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+ : ""));
}
rlEncodings.clear();
dlEncodings.clear();
dataEncodings.clear();
pageCount = 0;
+ pageOrdinal = -1;
}
@Override
public long allocatedSize() {
- return buf.getCapacity();
+ return buf.size();
}
@Override
@@ -298,8 +373,13 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
BytesInput dictionaryBytes = dictionaryPage.getBytes();
int uncompressedSize = (int)dictionaryBytes.size();
BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+ if (null != pageBlockEncryptor) {
+ byte[] dictonaryPageAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPage,
+ rowGroupOrdinal, columnOrdinal, -1);
+ compressedBytes = BytesInput.from(pageBlockEncryptor.encrypt(compressedBytes.toByteArray(), dictonaryPageAAD));
+ }
this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize,
- dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+ dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
}
@Override
@@ -308,9 +388,96 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
}
@Override
+ public void writeBloomFilter(BloomFilter bloomFilter) {
+ this.bloomFilter = bloomFilter;
+ }
+
+ @Override
public void close() {
+ tempOutputStream.close();
buf.close();
}
}
+ private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+ private final MessageType schema;
+
+ public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+ int maxCapacityHint, ByteBufferAllocator allocator,
+ int columnIndexTruncateLength) {
+ this(compressor, schema, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength,
+ ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+ }
+
+ public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+ int maxCapacityHint, ByteBufferAllocator allocator,
+ int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
+ this.schema = schema;
+ for (ColumnDescriptor path : schema.getColumns()) {
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength,
+ pageWriteChecksumEnabled, null, null, null, -1, -1));
+ }
+ }
+
+ public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+ int maxCapacityHint, ByteBufferAllocator allocator,
+ int columnIndexTruncateLength, boolean pageWriteChecksumEnabled,
+ InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) {
+ this.schema = schema;
+ if (null == fileEncryptor) {
+ for (ColumnDescriptor path : schema.getColumns()) {
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator,
+ columnIndexTruncateLength, pageWriteChecksumEnabled, null, null,
+ null, -1, -1));
+ }
+ return;
+ }
+
+ // Encrypted file
+ int columnOrdinal = -1;
+ byte[] fileAAD = fileEncryptor.getFileAAD();
+ for (ColumnDescriptor path : schema.getColumns()) {
+ columnOrdinal++;
+ BlockCipher.Encryptor headerBlockEncryptor = null;
+ BlockCipher.Encryptor pageBlockEncryptor = null;
+ ColumnPath columnPath = ColumnPath.get(path.getPath());
+
+ InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal);
+ if (columnSetup.isEncrypted()) {
+ headerBlockEncryptor = columnSetup.getMetaDataEncryptor();
+ pageBlockEncryptor = columnSetup.getDataEncryptor();
+ }
+
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize, maxCapacityHint, allocator,
+ columnIndexTruncateLength, pageWriteChecksumEnabled, headerBlockEncryptor, pageBlockEncryptor, fileAAD,
+ rowGroupOrdinal, columnOrdinal));
+ }
+ }
+
+ @Override
+ public PageWriter getPageWriter(ColumnDescriptor path) {
+ return writers.get(path);
+ }
+
+ @Override
+ public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
+ return writers.get(path);
+ }
+
+
+
+ public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+ for (ColumnDescriptor path : schema.getColumns()) {
+ ColumnChunkPageWriter pageWriter = writers.get(path);
+ pageWriter.writeToFileWriter(writer);
+ }
+ }
+
+ @Override
+ public void close() {
+ for (ColumnChunkPageWriter pageWriter : writers.values()) {
+ pageWriter.close();
+ }
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
new file mode 100644
index 0000000..f90f4c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -0,0 +1,1634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.format.Util.writeFileCryptoMetaData;
+import static org.apache.parquet.format.Util.writeFileMetaData;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.Version;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.crypto.ModuleCipherFactory;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
+import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.GlobalMetaData;
+import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
+import org.apache.parquet.internal.hadoop.metadata.IndexReference;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.TypeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal implementation of the Parquet file writer as a block container<br>
+ * Note: this is temporary Drill-Parquet class needed to write empty parquet files. Details in
+ * <a href="https://issues.apache.org/jira/browse/PARQUET-2026">PARQUET-2026</a> and
+ * <a href="https://issues.apache.org/jira/browse/DRILL-7907">DRILL-7907</a>
+ */
+public class ParquetFileWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
+
+ private final ParquetMetadataConverter metadataConverter;
+
+ public static final String PARQUET_METADATA_FILE = "_metadata";
+ public static final String MAGIC_STR = "PAR1";
+ public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+ public static final String EF_MAGIC_STR = "PARE";
+ public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
+ public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
+ public static final int CURRENT_VERSION = 1;
+
+ // File creation modes
+ public static enum Mode {
+ CREATE,
+ OVERWRITE
+ }
+
+ protected final PositionOutputStream out;
+
+ private final MessageType schema;
+ private final AlignmentStrategy alignment;
+ private final int columnIndexTruncateLength;
+
+ // file data
+ private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+ // The column/offset indexes per blocks per column chunks
+ private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
+ private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
+
+ // The Bloom filters
+ private final List<Map<String, BloomFilter>> bloomFilters = new ArrayList<>();
+
+ // The file encryptor
+ private final InternalFileEncryptor fileEncryptor;
+
+ // row group data
+ private BlockMetaData currentBlock; // appended to by endColumn
+
+ // The column/offset indexes for the actual block
+ private List<ColumnIndex> currentColumnIndexes;
+ private List<OffsetIndex> currentOffsetIndexes;
+
+ // The Bloom filter for the actual block
+ private Map<String, BloomFilter> currentBloomFilters;
+
+ // row group data set at the start of a row group
+ private long currentRecordCount; // set in startBlock
+
+ // column chunk data accumulated as pages are written
+ private EncodingStats.Builder encodingStatsBuilder;
+ private Set<Encoding> currentEncodings;
+ private long uncompressedLength;
+ private long compressedLength;
+ private Statistics currentStatistics; // accumulated in writePage(s)
+ private ColumnIndexBuilder columnIndexBuilder;
+ private OffsetIndexBuilder offsetIndexBuilder;
+
+ // column chunk data set at the start of a column
+ private CompressionCodecName currentChunkCodec; // set in startColumn
+ private ColumnPath currentChunkPath; // set in startColumn
+ private PrimitiveType currentChunkType; // set in startColumn
+ private long currentChunkValueCount; // set in startColumn
+ private long currentChunkFirstDataPage; // set in startColumn & page writes
+ private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
+
+ // set when end is called
+ private ParquetMetadata footer = null;
+
+ private final CRC32 crc;
+ private boolean pageWriteChecksumEnabled;
+
+ /**
+ * Captures the order in which methods should be called
+ */
+ private enum STATE {
+ NOT_STARTED {
+ STATE start() {
+ return STARTED;
+ }
+ },
+ STARTED {
+ STATE startBlock() {
+ return BLOCK;
+ }
+ STATE end() {
+ return ENDED;
+ }
+ },
+ BLOCK {
+ STATE startColumn() {
+ return COLUMN;
+ }
+ STATE endBlock() {
+ return STARTED;
+ }
+ },
+ COLUMN {
+ STATE endColumn() {
+ return BLOCK;
+ };
+ STATE write() {
+ return this;
+ }
+ },
+ ENDED;
+
+ STATE start() throws IOException { return error(); }
+ STATE startBlock() throws IOException { return error(); }
+ STATE startColumn() throws IOException { return error(); }
+ STATE write() throws IOException { return error(); }
+ STATE endColumn() throws IOException { return error(); }
+ STATE endBlock() throws IOException { return error(); }
+ STATE end() throws IOException { return error(); }
+
+ private final STATE error() throws IOException {
+ throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
+ }
+ }
+
+ private STATE state = STATE.NOT_STARTED;
+
+ /**
+ * @param configuration Hadoop configuration
+ * @param schema the schema of the data
+ * @param file the file to write to
+ * @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0
+ */
+ @Deprecated
+ public ParquetFileWriter(Configuration configuration, MessageType schema,
+ Path file) throws IOException {
+ this(HadoopOutputFile.fromPath(file, configuration),
+ schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
+ }
+
+ /**
+ * @param configuration Hadoop configuration
+ * @param schema the schema of the data
+ * @param file the file to write to
+ * @param mode file creation mode
+ * @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0
+ */
+ @Deprecated
+ public ParquetFileWriter(Configuration configuration, MessageType schema,
+ Path file, Mode mode) throws IOException {
+ this(HadoopOutputFile.fromPath(file, configuration),
+ schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
+ }
+
+ /**
+ * @param configuration Hadoop configuration
+ * @param schema the schema of the data
+ * @param file the file to write to
+ * @param mode file creation mode
+ * @param rowGroupSize the row group size
+ * @param maxPaddingSize the maximum padding
+ * @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0
+ */
+ @Deprecated
+ public ParquetFileWriter(Configuration configuration, MessageType schema,
+ Path file, Mode mode, long rowGroupSize,
+ int maxPaddingSize)
+ throws IOException {
+ this(HadoopOutputFile.fromPath(file, configuration),
+ schema, mode, rowGroupSize, maxPaddingSize);
+ }
+
+ /**
+ * @param file OutputFile to create or overwrite
+ * @param schema the schema of the data
+ * @param mode file creation mode
+ * @param rowGroupSize the row group size
+ * @param maxPaddingSize the maximum padding
+ * @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0
+ */
+ @Deprecated
+ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+ long rowGroupSize, int maxPaddingSize)
+ throws IOException {
+ this(file, schema, mode, rowGroupSize, maxPaddingSize,
+ ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+ }
+
+ /**
+ * @param file OutputFile to create or overwrite
+ * @param schema the schema of the data
+ * @param mode file creation mode
+ * @param rowGroupSize the row group size
+ * @param maxPaddingSize the maximum padding
+ * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+ * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to
+ * @param pageWriteChecksumEnabled whether to write out page level checksums
+ * @throws IOException if the file can not be created
+ */
+ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+ long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+ int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+ throws IOException{
+ this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength,
+ statisticsTruncateLength, pageWriteChecksumEnabled, null);
+ }
+
+ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+ long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+ int statisticsTruncateLength, boolean pageWriteChecksumEnabled,
+ FileEncryptionProperties encryptionProperties)
+ throws IOException {
+ TypeUtil.checkValidWriteSchema(schema);
+
+ this.schema = schema;
+
+ long blockSize = rowGroupSize;
+ if (file.supportsBlockSize()) {
+ blockSize = Math.max(file.defaultBlockSize(), rowGroupSize);
+ this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize);
+ } else {
+ this.alignment = NoAlignment.get(rowGroupSize);
+ }
+
+ if (mode == Mode.OVERWRITE) {
+ this.out = file.createOrOverwrite(blockSize);
+ } else {
+ this.out = file.create(blockSize);
+ }
+
+ this.encodingStatsBuilder = new EncodingStats.Builder();
+ this.columnIndexTruncateLength = columnIndexTruncateLength;
+ this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+ this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+ this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
+
+ if (null == encryptionProperties) {
+ this.fileEncryptor = null;
+ } else {
+ // Verify that every encrypted column is in file schema
+ Map<ColumnPath, ColumnEncryptionProperties> columnEncryptionProperties = encryptionProperties.getEncryptedColumns();
+ if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key
+ for (Map.Entry<ColumnPath, ColumnEncryptionProperties> entry : columnEncryptionProperties.entrySet()) {
+ String[] path = entry.getKey().toArray();
+ if(!schema.containsPath(path)) {
+ throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema");
+ }
+ }
+ }
+ this.fileEncryptor = new InternalFileEncryptor(encryptionProperties);
+ }
+ }
+
+ /**
+ * FOR TESTING ONLY. This supports testing block padding behavior on the local FS.
+ *
+ * @param configuration Hadoop configuration
+ * @param schema the schema of the data
+ * @param file the file to write to
+ * @param rowAndBlockSize the row group size
+ * @param maxPaddingSize the maximum padding
+ * @throws IOException if the file can not be created
+ */
+ ParquetFileWriter(Configuration configuration, MessageType schema,
+ Path file, long rowAndBlockSize, int maxPaddingSize)
+ throws IOException {
+ FileSystem fs = file.getFileSystem(configuration);
+ this.schema = schema;
+ this.alignment = PaddingAlignment.get(
+ rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
+ this.out = HadoopStreams.wrap(
+ fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
+ this.encodingStatsBuilder = new EncodingStats.Builder();
+ // no truncation is needed for testing
+ this.columnIndexTruncateLength = Integer.MAX_VALUE;
+ this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
+ this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+ this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ this.fileEncryptor = null;
+ }
+ /**
+ * start the file
+ * @throws IOException if there is an error while writing
+ */
+ public void start() throws IOException {
+ state = state.start();
+ LOG.debug("{}: start", out.getPos());
+ byte[] magic = MAGIC;
+ if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+ magic = EFMAGIC;
+ }
+ out.write(magic);
+ }
+
+ InternalFileEncryptor getEncryptor() {
+ return fileEncryptor;
+ }
+
+ /**
+ * start a block
+ * @param recordCount the record count in this block
+ * @throws IOException if there is an error while writing
+ */
+ public void startBlock(long recordCount) throws IOException {
+ state = state.startBlock();
+ LOG.debug("{}: start block", out.getPos());
+// out.write(MAGIC); // TODO: add a magic delimiter
+
+ alignment.alignForRowGroup(out);
+
+ currentBlock = new BlockMetaData();
+ currentRecordCount = recordCount;
+
+ currentColumnIndexes = new ArrayList<>();
+ currentOffsetIndexes = new ArrayList<>();
+
+ currentBloomFilters = new HashMap<>();
+ }
+
+ /**
+ * start a column inside a block
+ * @param descriptor the column descriptor
+ * @param valueCount the value count in this column
+ * @param compressionCodecName a compression codec name
+ * @throws IOException if there is an error while writing
+ */
+ public void startColumn(ColumnDescriptor descriptor,
+ long valueCount,
+ CompressionCodecName compressionCodecName) throws IOException {
+ state = state.startColumn();
+ encodingStatsBuilder.clear();
+ currentEncodings = new HashSet<Encoding>();
+ currentChunkPath = ColumnPath.get(descriptor.getPath());
+ currentChunkType = descriptor.getPrimitiveType();
+ currentChunkCodec = compressionCodecName;
+ currentChunkValueCount = valueCount;
+ currentChunkFirstDataPage = -1;
+ compressedLength = 0;
+ uncompressedLength = 0;
+ // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
+ currentStatistics = null;
+
+ columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
+ offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+ }
+
+ /**
+ * writes a dictionary page page
+ * @param dictionaryPage the dictionary page
+ * @throws IOException if there is an error while writing
+ */
+ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+ writeDictionaryPage(dictionaryPage, null, null);
+ }
+
+ public void writeDictionaryPage(DictionaryPage dictionaryPage,
+ BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException {
+ state = state.write();
+ LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
+ currentChunkDictionaryPageOffset = out.getPos();
+ int uncompressedSize = dictionaryPage.getUncompressedSize();
+ int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crc.update(dictionaryPage.getBytes().toByteArray());
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ (int) crc.getValue(),
+ out,
+ headerBlockEncryptor,
+ AAD);
+ } else {
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ out,
+ headerBlockEncryptor,
+ AAD);
+ }
+ long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+ this.uncompressedLength += uncompressedSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
+ dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
+ encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
+ currentEncodings.add(dictionaryPage.getEncoding());
+ }
+
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ * @throws IOException if there is an error while writing
+ */
+ @Deprecated
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ state = state.write();
+ // We are unable to build indexes without rowCount so skip them for this column
+ offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+ columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+ long beforeHeader = out.getPos();
+ LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+ int compressedPageSize = (int)bytes.size();
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out);
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
+ bytes.writeAllTo(out);
+ encodingStatsBuilder.addDataEncoding(valuesEncoding);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
+ }
+ }
+
+ /**
+ * writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param statistics statistics for the page
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ * @throws IOException if there is an error while writing
+ * @deprecated this method does not support writing column indexes; Use
+ * {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead
+ */
+ @Deprecated
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ // We are unable to build indexes without rowCount so skip them for this column
+ offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
+ columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
+ innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+ }
+
+ /**
+ * Writes a single page
+ * @param valueCount count of values
+ * @param uncompressedPageSize the size of the data once uncompressed
+ * @param bytes the compressed data for the page without header
+ * @param statistics the statistics of the page
+ * @param rowCount the number of rows in the page
+ * @param rlEncoding encoding of the repetition level
+ * @param dlEncoding encoding of the definition level
+ * @param valuesEncoding encoding of values
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ long rowCount,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ long beforeHeader = out.getPos();
+ innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
+
+ offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+ }
+
+ private void innerWriteDataPage(
+ int valueCount, int uncompressedPageSize,
+ BytesInput bytes,
+ Statistics statistics,
+ Encoding rlEncoding,
+ Encoding dlEncoding,
+ Encoding valuesEncoding) throws IOException {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
+ }
+ LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+ int compressedPageSize = (int) bytes.size();
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crc.update(bytes.toByteArray());
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ (int) crc.getValue(),
+ out);
+ } else {
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out);
+ }
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
+ bytes.writeAllTo(out);
+
+ // Copying the statistics if it is not initialized yet so we have the correct typed one
+ if (currentStatistics == null) {
+ currentStatistics = statistics.copy();
+ } else {
+ currentStatistics.mergeStatistics(statistics);
+ }
+
+ columnIndexBuilder.add(statistics);
+
+ encodingStatsBuilder.addDataEncoding(valuesEncoding);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ }
+
+ /**
+ * Add a Bloom filter that will be written out. This is only used in unit test.
+ *
+ * @param column the column name
+ * @param bloomFilter the bloom filter of column values
+ */
+ void addBloomFilter(String column, BloomFilter bloomFilter) {
+ currentBloomFilters.put(column, bloomFilter);
+ }
+
+ /**
+ * Writes a single v2 data page
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
+ * @param repetitionLevels repetition level bytes
+ * @param definitionLevels definition level bytes
+ * @param dataEncoding encoding for data
+ * @param compressedData compressed data bytes
+ * @param uncompressedDataSize the size of uncompressed data
+ * @param statistics the statistics of the page
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput compressedData,
+ int uncompressedDataSize,
+ Statistics<?> statistics) throws IOException {
+ state = state.write();
+ int rlByteLength = toIntWithCheck(repetitionLevels.size());
+ int dlByteLength = toIntWithCheck(definitionLevels.size());
+
+ int compressedSize = toIntWithCheck(
+ compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+ );
+
+ int uncompressedSize = toIntWithCheck(
+ uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()
+ );
+
+ long beforeHeader = out.getPos();
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
+ }
+
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ out);
+
+ long headersSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedSize + headersSize;
+ this.compressedLength += compressedSize + headersSize;
+
+ if (currentStatistics == null) {
+ currentStatistics = statistics.copy();
+ } else {
+ currentStatistics.mergeStatistics(statistics);
+ }
+
+ columnIndexBuilder.add(statistics);
+ currentEncodings.add(dataEncoding);
+ encodingStatsBuilder.addDataEncoding(dataEncoding);
+
+ BytesInput.concat(repetitionLevels, definitionLevels, compressedData)
+ .writeAllTo(out);
+
+ offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+ }
+
+ /**
+ * Writes a column chunk at once
+ * @param descriptor the descriptor of the column
+ * @param valueCount the value count in this column
+ * @param compressionCodecName the name of the compression codec used for compressing the pages
+ * @param dictionaryPage the dictionary page for this column chunk (might be null)
+ * @param bytes the encoded pages including page headers to be written as is
+ * @param uncompressedTotalPageSize total uncompressed size (without page headers)
+ * @param compressedTotalPageSize total compressed size (without page headers)
+ * @param totalStats accumulated statistics for the column chunk
+ * @param columnIndexBuilder the builder object for the column index
+ * @param offsetIndexBuilder the builder object for the offset index
+ * @param bloomFilter the bloom filter for this column
+ * @param rlEncodings the RL encodings used in this column chunk
+ * @param dlEncodings the DL encodings used in this column chunk
+ * @param dataEncodings the data encodings used in this column chunk
+ * @throws IOException if there is an error while writing
+ */
+ void writeColumnChunk(ColumnDescriptor descriptor,
+ long valueCount,
+ CompressionCodecName compressionCodecName,
+ DictionaryPage dictionaryPage,
+ BytesInput bytes,
+ long uncompressedTotalPageSize,
+ long compressedTotalPageSize,
+ Statistics<?> totalStats,
+ ColumnIndexBuilder columnIndexBuilder,
+ OffsetIndexBuilder offsetIndexBuilder,
+ BloomFilter bloomFilter,
+ Set<Encoding> rlEncodings,
+ Set<Encoding> dlEncodings,
+ List<Encoding> dataEncodings) throws IOException {
+ writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes,
+ uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder,
+ bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null);
+ }
+
+ void writeColumnChunk(ColumnDescriptor descriptor,
+ long valueCount,
+ CompressionCodecName compressionCodecName,
+ DictionaryPage dictionaryPage,
+ BytesInput bytes,
+ long uncompressedTotalPageSize,
+ long compressedTotalPageSize,
+ Statistics<?> totalStats,
+ ColumnIndexBuilder columnIndexBuilder,
+ OffsetIndexBuilder offsetIndexBuilder,
+ BloomFilter bloomFilter,
+ Set<Encoding> rlEncodings,
+ Set<Encoding> dlEncodings,
+ List<Encoding> dataEncodings,
+ BlockCipher.Encryptor headerBlockEncryptor,
+ int rowGroupOrdinal,
+ int columnOrdinal,
+ byte[] fileAAD) throws IOException {
+ startColumn(descriptor, valueCount, compressionCodecName);
+
+ state = state.write();
+ if (dictionaryPage != null) {
+ byte[] dictonaryPageHeaderAAD = null;
+ if (null != headerBlockEncryptor) {
+ dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
+ rowGroupOrdinal, columnOrdinal, -1);
+ }
+ writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
+ }
+
+ if (bloomFilter != null) {
+ // write bloom filter if one of data pages is not dictionary encoded
+ boolean isWriteBloomFilter = false;
+ for (Encoding encoding : dataEncodings) {
+ if (encoding != Encoding.RLE_DICTIONARY) {
+ isWriteBloomFilter = true;
+ break;
+ }
+ }
+ if (isWriteBloomFilter) {
+ currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+ }
+ }
+ LOG.debug("{}: write data pages", out.getPos());
+ long headersSize = bytes.size() - compressedTotalPageSize;
+ this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+ this.compressedLength += compressedTotalPageSize + headersSize;
+ LOG.debug("{}: write data pages content", out.getPos());
+ currentChunkFirstDataPage = out.getPos();
+ bytes.writeAllTo(out);
+ encodingStatsBuilder.addDataEncodings(dataEncodings);
+ if (rlEncodings.isEmpty()) {
+ encodingStatsBuilder.withV2Pages();
+ }
+ currentEncodings.addAll(rlEncodings);
+ currentEncodings.addAll(dlEncodings);
+ currentEncodings.addAll(dataEncodings);
+ currentStatistics = totalStats;
+
+ this.columnIndexBuilder = columnIndexBuilder;
+ this.offsetIndexBuilder = offsetIndexBuilder;
+
+ endColumn();
+ }
+
+ /**
+ * end a column (once all rep, def and data have been written)
+ * @throws IOException if there is an error while writing
+ */
+ public void endColumn() throws IOException {
+ state = state.endColumn();
+ LOG.debug("{}: end column", out.getPos());
+ if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
+ currentColumnIndexes.add(null);
+ } else {
+ currentColumnIndexes.add(columnIndexBuilder.build());
+ }
+ currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ currentChunkPath,
+ currentChunkType,
+ currentChunkCodec,
+ encodingStatsBuilder.build(),
+ currentEncodings,
+ currentStatistics,
+ currentChunkFirstDataPage,
+ currentChunkDictionaryPageOffset,
+ currentChunkValueCount,
+ compressedLength,
+ uncompressedLength));
+ this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
+ this.uncompressedLength = 0;
+ this.compressedLength = 0;
+ columnIndexBuilder = null;
+ offsetIndexBuilder = null;
+ }
+
+ /**
+ * ends a block once all column chunks have been written
+ * @throws IOException if there is an error while writing
+ */
+ public void endBlock() throws IOException {
+// if (currentRecordCount == 0) {
+// throw new ParquetEncodingException("End block with zero record");
+// }
+
+ state = state.endBlock();
+ LOG.debug("{}: end block", out.getPos());
+ currentBlock.setRowCount(currentRecordCount);
+ currentBlock.setOrdinal(blocks.size());
+ blocks.add(currentBlock);
+ columnIndexes.add(currentColumnIndexes);
+ offsetIndexes.add(currentOffsetIndexes);
+ bloomFilters.add(currentBloomFilters);
+ currentColumnIndexes = null;
+ currentOffsetIndexes = null;
+ currentBloomFilters = null;
+ currentBlock = null;
+ }
+
+ /**
+ * @param conf a configuration
+ * @param file a file path to append the contents of to this file
+ * @throws IOException if there is an error while reading or writing
+ * @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead
+ */
+ @Deprecated
+ public void appendFile(Configuration conf, Path file) throws IOException {
+ ParquetFileReader.open(conf, file).appendTo(this);
+ }
+
+ public void appendFile(InputFile file) throws IOException {
+ try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+ reader.appendTo(this);
+ }
+ }
+
+ /**
+ * @param file a file stream to read from
+ * @param rowGroups row groups to copy
+ * @param dropColumns whether to drop columns from the file that are not in this file's schema
+ * @throws IOException if there is an error while reading or writing
+ * @deprecated will be removed in 2.0.0;
+ * use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead
+ */
+ @Deprecated
+ public void appendRowGroups(FSDataInputStream file,
+ List<BlockMetaData> rowGroups,
+ boolean dropColumns) throws IOException {
+ appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns);
+ }
+
+ public void appendRowGroups(SeekableInputStream file,
+ List<BlockMetaData> rowGroups,
+ boolean dropColumns) throws IOException {
+ for (BlockMetaData block : rowGroups) {
+ appendRowGroup(file, block, dropColumns);
+ }
+ }
+
+ /**
+ * @param from a file stream to read from
+ * @param rowGroup row group to copy
+ * @param dropColumns whether to drop columns from the file that are not in this file's schema
+ * @throws IOException if there is an error while reading or writing
+ * @deprecated will be removed in 2.0.0;
+ * use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead
+ */
+ @Deprecated
+ public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
+ boolean dropColumns) throws IOException {
+ appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns);
+ }
+
+ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
+ boolean dropColumns) throws IOException {
+ startBlock(rowGroup.getRowCount());
+
+ Map<String, ColumnChunkMetaData> columnsToCopy =
+ new HashMap<String, ColumnChunkMetaData>();
+ for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
+ columnsToCopy.put(chunk.getPath().toDotString(), chunk);
+ }
+
+ List<ColumnChunkMetaData> columnsInOrder =
+ new ArrayList<ColumnChunkMetaData>();
+
+ for (ColumnDescriptor descriptor : schema.getColumns()) {
+ String path = ColumnPath.get(descriptor.getPath()).toDotString();
+ ColumnChunkMetaData chunk = columnsToCopy.remove(path);
+ if (chunk != null) {
+ columnsInOrder.add(chunk);
+ } else {
+ throw new IllegalArgumentException(String.format(
+ "Missing column '%s', cannot copy row group: %s", path, rowGroup));
+ }
+ }
+
+ // complain if some columns would be dropped and that's not okay
+ if (!dropColumns && !columnsToCopy.isEmpty()) {
+ throw new IllegalArgumentException(String.format(
+ "Columns cannot be copied (missing from target schema): %s",
+ String.join(", ", columnsToCopy.keySet())));
+ }
+
+ // copy the data for all chunks
+ long start = -1;
+ long length = 0;
+ long blockUncompressedSize = 0L;
+ for (int i = 0; i < columnsInOrder.size(); i += 1) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
+
+ // get this chunk's start position in the new file
+ long newChunkStart = out.getPos() + length;
+
+ // add this chunk to be copied with any previous chunks
+ if (start < 0) {
+ // no previous chunk included, start at this chunk's starting pos
+ start = chunk.getStartingPos();
+ }
+ length += chunk.getTotalSize();
+
+ if ((i + 1) == columnsInOrder.size() ||
+ columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
+ // not contiguous. do the copy now.
+ copy(from, out, start, length);
+ // reset to start at the next column chunk
+ start = -1;
+ length = 0;
+ }
+
+ // TODO: column/offset indexes are not copied
+ // (it would require seeking to the end of the file for each row groups)
+ currentColumnIndexes.add(null);
+ currentOffsetIndexes.add(null);
+
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
+
+ blockUncompressedSize += chunk.getTotalUncompressedSize();
+ }
+
+ currentBlock.setTotalByteSize(blockUncompressedSize);
+
+ endBlock();
+ }
+
+ /**
+ * @param descriptor the descriptor for the target column
+ * @param from a file stream to read from
+ * @param chunk the column chunk to be copied
+ * @param bloomFilter the bloomFilter for this chunk
+ * @param columnIndex the column index for this chunk
+ * @param offsetIndex the offset index for this chunk
+ * @throws IOException
+ */
+ public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk,
+ BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException {
+ long start = chunk.getStartingPos();
+ long length = chunk.getTotalSize();
+ long newChunkStart = out.getPos();
+
+ copy(from, out, start, length);
+
+ currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+ currentColumnIndexes.add(columnIndex);
+ currentOffsetIndexes.add(offsetIndex);
+
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
+
+ currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
+ }
+
+ // Buffers for the copy function.
+ private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);
+
+ /**
+ * Copy from a FS input stream to an output stream. Thread-safe
+ *
+ * @param from a {@link SeekableInputStream}
+ * @param to any {@link PositionOutputStream}
+ * @param start where in the from stream to start copying
+ * @param length the number of bytes to copy
+ * @throws IOException if there is an error while reading or writing
+ */
+ private static void copy(SeekableInputStream from, PositionOutputStream to,
+ long start, long length) throws IOException{
+ LOG.debug("Copying {} bytes at {} to {}", length, start, to.getPos());
+ from.seek(start);
+ long bytesCopied = 0;
+ byte[] buffer = COPY_BUFFER.get();
+ while (bytesCopied < length) {
+ long bytesLeft = length - bytesCopied;
+ int bytesRead = from.read(buffer, 0,
+ (buffer.length < bytesLeft ? buffer.length : (int) bytesLeft));
+ if (bytesRead < 0) {
+ throw new IllegalArgumentException(
+ "Unexpected end of input file at " + start + bytesCopied);
+ }
+ to.write(buffer, 0, bytesRead);
+ bytesCopied += bytesRead;
+ }
+ }
+
+ /**
+ * ends a file once all blocks have been written.
+ * closes the file.
+ * @param extraMetaData the extra meta data to write in the footer
+ * @throws IOException if there is an error while writing
+ */
+ public void end(Map<String, String> extraMetaData) throws IOException {
+ state = state.end();
+ serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+ serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+ serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
+ LOG.debug("{}: end", out.getPos());
+ this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
+ serializeFooter(footer, out, fileEncryptor);
+ out.close();
+ }
+
+ private static void serializeColumnIndexes(
+ List<List<ColumnIndex>> columnIndexes,
+ List<BlockMetaData> blocks,
+ PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
+ LOG.debug("{}: column indexes", out.getPos());
+ for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+ BlockMetaData block = blocks.get(bIndex);
+ List<ColumnChunkMetaData> columns = block.getColumns();
+ List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
+ for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+ ColumnChunkMetaData column = columns.get(cIndex);
+ org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter
+ .toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex));
+ if (columnIndex == null) {
+ continue;
+ }
+ BlockCipher.Encryptor columnIndexEncryptor = null;
+ byte[] columnIndexAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+ columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex,
+ block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+ }
+ }
+ long offset = out.getPos();
+ Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD);
+ column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+ }
+ }
+ }
+
+ private int toIntWithCheck(long size) {
+ if ((int)size != size) {
+ throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size);
+ }
+ return (int)size;
+ }
+
+ private static void serializeOffsetIndexes(
+ List<List<OffsetIndex>> offsetIndexes,
+ List<BlockMetaData> blocks,
+ PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
+ LOG.debug("{}: offset indexes", out.getPos());
+ for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+ BlockMetaData block = blocks.get(bIndex);
+ List<ColumnChunkMetaData> columns = block.getColumns();
+ List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
+ for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+ OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
+ if (offsetIndex == null) {
+ continue;
+ }
+ ColumnChunkMetaData column = columns.get(cIndex);
+ BlockCipher.Encryptor offsetIndexEncryptor = null;
+ byte[] offsetIndexAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+ offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex,
+ block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
+ }
+ }
+ long offset = out.getPos();
+ Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD);
+ column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
+ }
+ }
+ }
+
+ private static void serializeBloomFilters(
+ List<Map<String, BloomFilter>> bloomFilters,
+ List<BlockMetaData> blocks,
+ PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
+ LOG.debug("{}: bloom filters", out.getPos());
+ for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+ BlockMetaData block = blocks.get(bIndex);
+ List<ColumnChunkMetaData> columns = block.getColumns();
+ Map<String, BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
+ if (blockBloomFilters.isEmpty()) {
+ continue;
+ }
+ for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+ ColumnChunkMetaData column = columns.get(cIndex);
+ BloomFilter bloomFilter = blockBloomFilters.get(column.getPath().toDotString());
+ if (bloomFilter == null) {
+ continue;
+ }
+
+ long offset = out.getPos();
+ column.setBloomFilterOffset(offset);
+
+ BlockCipher.Encryptor bloomFilterEncryptor = null;
+ byte[] bloomFilterHeaderAAD = null;
+ byte[] bloomFilterBitsetAAD = null;
+ if (null != fileEncryptor) {
+ InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
+ if (columnEncryptionSetup.isEncrypted()) {
+ bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
+ int columnOrdinal = columnEncryptionSetup.getOrdinal();
+ bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader,
+ block.getOrdinal(), columnOrdinal, -1);
+ bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset,
+ block.getOrdinal(), columnOrdinal, -1);
+ }
+ }
+
+ Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out,
+ bloomFilterEncryptor, bloomFilterHeaderAAD);
+
+ ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+ bloomFilter.writeTo(tempOutStream);
+ byte[] serializedBitset = tempOutStream.toByteArray();
+ if (null != bloomFilterEncryptor) {
+ serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
+ }
+ out.write(serializedBitset);
+ }
+ }
+ }
+
+ private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
+ InternalFileEncryptor fileEncryptor) throws IOException {
+
+ ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+
+ // Unencrypted file
+ if (null == fileEncryptor) {
+ long footerIndex = out.getPos();
+ org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+ writeFileMetaData(parquetMetadata, out);
+ LOG.debug("{}: footer length = {}", out.getPos(), (out.getPos() - footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+ out.write(MAGIC);
+ return;
+ }
+
+ org.apache.parquet.format.FileMetaData parquetMetadata =
+ metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor);
+
+ // Encrypted file with plaintext footer
+ if (!fileEncryptor.isFooterEncrypted()) {
+ long footerIndex = out.getPos();
+ parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm());
+ // create footer signature (nonce + tag of encrypted footer)
+ byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData();
+ if (null != footerSigningKeyMetaData) {
+ parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData);
+ }
+ ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+ writeFileMetaData(parquetMetadata, tempOutStream);
+ byte[] serializedFooter = tempOutStream.toByteArray();
+ byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+ byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD);
+ byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH];
+ System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce
+ System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH,
+ signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag
+ out.write(serializedFooter);
+ out.write(signature);
+ LOG.debug("{}: footer and signature length = {}", out.getPos(), (out.getPos() - footerIndex));
+ BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+ out.write(MAGIC);
+ return;
+ }
+
+ // Encrypted file with encrypted footer
+ long cryptoFooterIndex = out.getPos();
+ writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out);
+ byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
+ writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD);
+ int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex);
+ LOG.debug("{}: crypto metadata and footer length = {}", out.getPos(), combinedMetaDataLength);
+ BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength);
+ out.write(EFMAGIC);
+ }
+
+ public ParquetMetadata getFooter() {
+ Preconditions.checkState(state == STATE.ENDED, "Cannot return unfinished footer.");
+ return footer;
+ }
+
+ /**
+ * Given a list of metadata files, merge them into a single ParquetMetadata
+ * Requires that the schemas be compatible, and the extraMetadata be exactly equal.
+ * @param files a list of files to merge metadata from
+ * @param conf a configuration
+ * @return merged parquet metadata for the files
+ * @throws IOException if there is an error while writing
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf) throws IOException {
+ return mergeMetadataFiles(files, conf, new StrictKeyValueMetadataMergeStrategy());
+ }
+
+ /**
+ * Given a list of metadata files, merge them into a single ParquetMetadata
+ * Requires that the schemas be compatible, and the extraMetadata be exactly equal.
+ * @param files a list of files to merge metadata from
+ * @param conf a configuration
+ * @param keyValueMetadataMergeStrategy strategy to merge values for same key, if there are multiple
+ * @return merged parquet metadata for the files
+ * @throws IOException if there is an error while writing
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf,
+ KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException {
+ Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata");
+
+ GlobalMetaData globalMetaData = null;
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+
+ for (Path p : files) {
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER);
+ FileMetaData fmd = pmd.getFileMetaData();
+ globalMetaData = mergeInto(fmd, globalMetaData, true);
+ blocks.addAll(pmd.getBlocks());
+ }
+
+ // collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible
+ return new ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks);
+ }
+
+ /**
+ * Given a list of metadata files, merge them into a single metadata file.
+ * Requires that the schemas be compatible, and the extraMetaData be exactly equal.
+ * This is useful when merging 2 directories of parquet files into a single directory, as long
+ * as both directories were written with compatible schemas and equal extraMetaData.
+ * @param files a list of files to merge metadata from
+ * @param outputPath path to write merged metadata to
+ * @param conf a configuration
+ * @throws IOException if there is an error while reading or writing
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException {
+ ParquetMetadata merged = mergeMetadataFiles(files, conf);
+ writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
+ }
+
+ /**
+ * writes a _metadata and _common_metadata file
+ * @param configuration the configuration to use to get the FileSystem
+ * @param outputPath the directory to write the _metadata file to
+ * @param footers the list of footers to merge
+ * @throws IOException if there is an error while writing
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
+ writeMetadataFile(configuration, outputPath, footers, JobSummaryLevel.ALL);
+ }
+
+ /**
+ * writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided
+ * @param configuration the configuration to use to get the FileSystem
+ * @param outputPath the directory to write the _metadata file to
+ * @param footers the list of footers to merge
+ * @param level level of summary to write
+ * @throws IOException if there is an error while writing
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException {
+ Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY,
+ "Unsupported level: " + level);
+
+ FileSystem fs = outputPath.getFileSystem(configuration);
+ outputPath = outputPath.makeQualified(fs);
+ ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
+
+ if (level == JobSummaryLevel.ALL) {
+ writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
+ }
+
+ metadataFooter.getBlocks().clear();
+ writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
+ }
+
+ /**
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
+ throws IOException {
+ Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
+ writeMetadataFile(metaDataPath, metadataFooter, fs);
+ }
+
+ /**
+ * @deprecated metadata files are not recommended and will be removed in 2.0.0
+ */
+ @Deprecated
+ private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs)
+ throws IOException {
+ PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
+ metadata.write(MAGIC);
+ serializeFooter(metadataFooter, metadata, null);
+ metadata.close();
+ }
+
+ /**
+ * Will merge the metadata of all the footers together
+ * @param root the directory containing all footers
+ * @param footers the list files footers to merge
+ * @return the global meta data for all the footers
+ */
+ static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
+ return mergeFooters(root, footers, new StrictKeyValueMetadataMergeStrategy());
+ }
+
+ /**
+ * Will merge the metadata of all the footers together
+ * @param root the directory containing all footers
+ * @param footers the list files footers to merge
+ * @param keyValueMergeStrategy strategy to merge values for a given key (if there are multiple values)
+ * @return the global meta data for all the footers
+ */
+ static ParquetMetadata mergeFooters(Path root, List<Footer> footers, KeyValueMetadataMergeStrategy keyValueMergeStrategy) {
+ String rootPath = root.toUri().getPath();
+ GlobalMetaData fileMetaData = null;
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ for (Footer footer : footers) {
+ String footerPath = footer.getFile().toUri().getPath();
+ if (!footerPath.startsWith(rootPath)) {
+ throw new ParquetEncodingException(footerPath + " invalid: all the files must be contained in the root " + root);
+ }
+ footerPath = footerPath.substring(rootPath.length());
+ while (footerPath.startsWith("/")) {
+ footerPath = footerPath.substring(1);
+ }
+ fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
+ for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
+ block.setPath(footerPath);
+ blocks.add(block);
+ }
+ }
+ return new ParquetMetadata(fileMetaData.merge(keyValueMergeStrategy), blocks);
+ }
+
+ /**
+ * @return the current position in the underlying file
+ * @throws IOException if there is an error while getting the current stream's position
+ */
+ public long getPos() throws IOException {
+ return out.getPos();
+ }
+
+ public long getNextRowGroupSize() throws IOException {
+ return alignment.nextRowGroupSize(out);
+ }
+
+ /**
+ * Will merge the metadata of all the footers together
+ * @param footers the list files footers to merge
+ * @return the global meta data for all the footers
+ */
+ static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
+ return getGlobalMetaData(footers, true);
+ }
+
+ static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
+ GlobalMetaData fileMetaData = null;
+ for (Footer footer : footers) {
+ ParquetMetadata currentMetadata = footer.getParquetMetadata();
+ fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
+ }
+ return fileMetaData;
+ }
+
+ /**
+ * Will return the result of merging toMerge into mergedMetadata
+ * @param toMerge the metadata toMerge
+ * @param mergedMetadata the reference metadata to merge into
+ * @return the result of the merge
+ */
+ static GlobalMetaData mergeInto(
+ FileMetaData toMerge,
+ GlobalMetaData mergedMetadata) {
+ return mergeInto(toMerge, mergedMetadata, true);
+ }
+
+ static GlobalMetaData mergeInto(
+ FileMetaData toMerge,
+ GlobalMetaData mergedMetadata,
+ boolean strict) {
+ MessageType schema = null;
+ Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
+ Set<String> createdBy = new HashSet<String>();
+ if (mergedMetadata != null) {
+ schema = mergedMetadata.getSchema();
+ newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
+ createdBy.addAll(mergedMetadata.getCreatedBy());
+ }
+ if ((schema == null && toMerge.getSchema() != null)
+ || (schema != null && !schema.equals(toMerge.getSchema()))) {
+ schema = mergeInto(toMerge.getSchema(), schema, strict);
+ }
+ for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
+ Set<String> values = newKeyValues.get(entry.getKey());
+ if (values == null) {
+ values = new LinkedHashSet<String>();
+ newKeyValues.put(entry.getKey(), values);
+ }
+ values.add(entry.getValue());
+ }
+ createdBy.add(toMerge.getCreatedBy());
+ return new GlobalMetaData(
+ schema,
+ newKeyValues,
+ createdBy);
+ }
+
+ /**
+ * will return the result of merging toMerge into mergedSchema
+ * @param toMerge the schema to merge into mergedSchema
+ * @param mergedSchema the schema to append the fields to
+ * @return the resulting schema
+ */
+ static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
+ return mergeInto(toMerge, mergedSchema, true);
+ }
+
+ /**
+ * will return the result of merging toMerge into mergedSchema
+ * @param toMerge the schema to merge into mergedSchema
+ * @param mergedSchema the schema to append the fields to
+ * @param strict should schema primitive types match
+ * @return the resulting schema
+ */
+ static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
+ if (mergedSchema == null) {
+ return toMerge;
+ }
+
+ return mergedSchema.union(toMerge, strict);
+ }
+
+ private interface AlignmentStrategy {
+ void alignForRowGroup(PositionOutputStream out) throws IOException;
+
+ long nextRowGroupSize(PositionOutputStream out) throws IOException;
+ }
+
+ private static class NoAlignment implements AlignmentStrategy {
+ public static NoAlignment get(long rowGroupSize) {
+ return new NoAlignment(rowGroupSize);
+ }
+
+ private final long rowGroupSize;
+
+ private NoAlignment(long rowGroupSize) {
+ this.rowGroupSize = rowGroupSize;
+ }
+
+ @Override
+ public void alignForRowGroup(PositionOutputStream out) {
+ }
+
+ @Override
+ public long nextRowGroupSize(PositionOutputStream out) {
+ return rowGroupSize;
+ }
+ }
+
+ /**
+ * Alignment strategy that pads when less than half the row group size is
+ * left before the next DFS block.
+ */
+ private static class PaddingAlignment implements AlignmentStrategy {
+ private static final byte[] zeros = new byte[4096];
+
+ public static PaddingAlignment get(long dfsBlockSize, long rowGroupSize,
+ int maxPaddingSize) {
+ return new PaddingAlignment(dfsBlockSize, rowGroupSize, maxPaddingSize);
+ }
+
+ protected final long dfsBlockSize;
+ protected final long rowGroupSize;
+ protected final int maxPaddingSize;
+
+ private PaddingAlignment(long dfsBlockSize, long rowGroupSize,
+ int maxPaddingSize) {
+ this.dfsBlockSize = dfsBlockSize;
+ this.rowGroupSize = rowGroupSize;
+ this.maxPaddingSize = maxPaddingSize;
+ }
+
+ @Override
+ public void alignForRowGroup(PositionOutputStream out) throws IOException {
+ long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
+
+ if (isPaddingNeeded(remaining)) {
+ LOG.debug("Adding {} bytes of padding (row group size={}B, block size={}B)", remaining, rowGroupSize, dfsBlockSize);
+ for (; remaining > 0; remaining -= zeros.length) {
+ out.write(zeros, 0, (int) Math.min((long) zeros.length, remaining));
+ }
+ }
+ }
+
+ @Override
+ public long nextRowGroupSize(PositionOutputStream out) throws IOException {
+ if (maxPaddingSize <= 0) {
+ return rowGroupSize;
+ }
+
+ long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
+
+ if (isPaddingNeeded(remaining)) {
+ return rowGroupSize;
+ }
+
+ return Math.min(remaining, rowGroupSize);
+ }
+
+ protected boolean isPaddingNeeded(long remaining) {
+ return (remaining <= maxPaddingSize);
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index e399ee7..d4a6fcb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -46,7 +46,8 @@ import static org.apache.drill.exec.store.parquet.ParquetSimpleTestFileGenerator
* that are supported by Drill. Embedded types specified in the Parquet specification are not covered by the
* examples but can be added.
* To create a new parquet file, define a schema, create a GroupWriter based on the schema, then add values
- * for individual records to the GroupWriter.
+ * for individual records to the GroupWriter.<br>
+ * TODO: DRILL-7904. To run this tool please use 28.2-jre <guava.version> instead of 19.0 in main POM file
* @see org.apache.drill.exec.store.parquet.TestFileGenerator TestFileGenerator
* @see org.apache.parquet.hadoop.example.GroupWriteSupport GroupWriteSupport
* @see org.apache.parquet.example.Paper Dremel Example
@@ -65,7 +66,7 @@ public class ParquetSimpleTestFileGenerator {
" required int32 rowKey; \n" +
" required binary _UTF8 ( UTF8 ) ; \n" +
" required binary _Enum ( ENUM ) ; \n" +
- // " required binary _UUID ( UUID ) ; \n" +
+ " required fixed_len_byte_array(16) _UUID ( UUID ) ; \n" +
" required int32 _INT32_RAW ; \n" +
" required int32 _INT_8 ( INT_8 ) ; \n" +
" required int32 _INT_16 ( INT_16 ) ; \n" +
@@ -93,7 +94,7 @@ public class ParquetSimpleTestFileGenerator {
" required int32 rowKey; \n" +
" optional binary _UTF8 ( UTF8 ) ; \n" +
" optional binary _Enum ( ENUM ) ; \n" +
- // " optional binary _UUID ( UUID ) ; \n" +
+ " optional fixed_len_byte_array(16) _UUID ( UUID ) ; \n" +
" optional int32 _INT32_RAW ; \n" +
" optional int32 _INT_8 ( INT_8 ) ; \n" +
" optional int32 _INT_16 ( INT_16 ) ; \n" +
@@ -123,7 +124,7 @@ public class ParquetSimpleTestFileGenerator {
" required group StringTypes { \n" +
" required binary _UTF8 ( UTF8 ) ; \n" +
" required binary _Enum ( ENUM ) ; \n" +
- // " required binary _UUID ( UUID ) ; \n" +
+ " required fixed_len_byte_array(16) _UUID ( UUID ) ; \n" +
" } \n" +
" required group NumericTypes { \n" +
" required group Int32 { \n" +
@@ -167,7 +168,7 @@ public class ParquetSimpleTestFileGenerator {
" optional group StringTypes { \n" +
" optional binary _UTF8 ( UTF8 ) ; \n" +
" optional binary _Enum ( ENUM ) ; \n" +
- // " optional binary _UUID ( UUID ) ; \n" +
+ " optional fixed_len_byte_array(16) _UUID ( UUID ) ; \n" +
" } \n" +
" optional group NumericTypes { \n" +
" optional group Int32 { \n" +
@@ -243,9 +244,12 @@ public class ParquetSimpleTestFileGenerator {
{
Group complexGroup = gf.newGroup();
complexGroup.add("rowKey", ++rowKey);
- complexGroup.addGroup("StringTypes").append("_UTF8", "UTF8 string" + rowKey).append("_Enum", RANDOM_VALUE
- .toString());
- // .append("_UUID", "00112233445566778899aabbccddeeff");
+ byte[] bytes = new byte[30];
+ Arrays.fill(bytes, (byte) 1);
+ complexGroup.addGroup("StringTypes")
+ .append("_UTF8", "UTF8 string" + rowKey)
+ .append("_Enum", RANDOM_VALUE.toString())
+ .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16));
Group numeric = complexGroup.addGroup("NumericTypes");
numeric.addGroup("Int32")
.append("_INT32_RAW", 1234567)
@@ -275,9 +279,12 @@ public class ParquetSimpleTestFileGenerator {
{
Group complexGroup = gf.newGroup();
complexGroup.add("rowKey", ++rowKey);
- complexGroup.addGroup("StringTypes").append("_UTF8", "UTF8 string" + rowKey).append("_Enum", MAX_VALUE
- .toString());
- // .append("_UUID", "00112233445566778899aabbccddeeff");
+ byte[] bytes = new byte[30];
+ Arrays.fill(bytes, (byte) 1);
+ complexGroup.addGroup("StringTypes")
+ .append("_UTF8", "UTF8 string" + rowKey)
+ .append("_Enum", MAX_VALUE.toString())
+ .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16));
Group numeric = complexGroup.addGroup("NumericTypes");
numeric.addGroup("Int32")
.append("_INT32_RAW", 0x7FFFFFFF)
@@ -293,8 +300,6 @@ public class ParquetSimpleTestFileGenerator {
.append("_INT_64", 0x7FFFFFFFFFFFFFFFL)
.append("_UINT_64", 0xFFFFFFFFFFFFFFFFL)
.append("_DECIMAL_decimal18", 0xFFFFFFFFFFFFFFFFL);
- byte[] bytes = new byte[30];
- Arrays.fill(bytes, (byte) 1);
numeric.addGroup("FixedLen").append("_DECIMAL_fixed_n", Binary.fromConstantByteArray(bytes, 0, 20));
numeric.addGroup("Binary").append("_DECIMAL_unlimited", Binary.fromConstantByteArray(bytes, 0, 30));
numeric.addGroup("DateTimeTypes")
@@ -309,9 +314,12 @@ public class ParquetSimpleTestFileGenerator {
{
Group complexGroup = gf.newGroup();
complexGroup.add("rowKey", ++rowKey);
- complexGroup.addGroup("StringTypes").append("_UTF8", "UTF8 string" + rowKey).append("_Enum", MIN_VALUE
- .toString());
- // .append("_UUID", "00112233445566778899aabbccddeeff");
+ byte[] bytes = new byte[30];
+ Arrays.fill(bytes, (byte) 1);
+ complexGroup.addGroup("StringTypes")
+ .append("_UTF8", "UTF8 string" + rowKey)
+ .append("_Enum", MIN_VALUE.toString())
+ .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16));
Group numeric = complexGroup.addGroup("NumericTypes");
numeric.addGroup("Int32")
.append("_INT32_RAW", 0x80000000)
@@ -354,8 +362,12 @@ public class ParquetSimpleTestFileGenerator {
byte[] bytes12 = {'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b' };
Group simpleGroup = sgf.newGroup();
simpleGroup.append("rowKey", ++rowKey);
- simpleGroup.append("_UTF8", "UTF8 string" + rowKey).append("_Enum", RANDOM_VALUE.toString())
- // .append("_UUID", "00112233445566778899aabbccddeeff");
+ byte[] bytes = new byte[30];
+ Arrays.fill(bytes, (byte) 1);
+ simpleGroup
+ .append("_UTF8", "UTF8 string" + rowKey)
+ .append("_Enum", RANDOM_VALUE.toString())
+ .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16))
.append("_INT32_RAW", 1234567)
.append("_INT_8", 123)
.append("_INT_16", 12345)
@@ -383,9 +395,10 @@ public class ParquetSimpleTestFileGenerator {
byte[] bytes = new byte[30];
Arrays.fill(bytes, (byte) 1);
simpleGroup.append("rowKey", ++rowKey);
- simpleGroup.append("_UTF8", "UTF8 string" + rowKey)
+ simpleGroup
+ .append("_UTF8", "UTF8 string" + rowKey)
.append("_Enum", MAX_VALUE.toString())
- // .append("_UUID", "00112233445566778899aabbccddeeff");
+ .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16))
.append("_INT32_RAW", 0x7FFFFFFF)
.append("_INT_8", 0x7F)
.append("_INT_16", 0x7FFF)
@@ -411,8 +424,12 @@ public class ParquetSimpleTestFileGenerator {
{
Group simpleGroup = sgf.newGroup();
simpleGroup.append("rowKey", ++rowKey);
- simpleGroup.append("_UTF8", "UTF8 string" + rowKey).append("_Enum", MIN_VALUE.toString())
- // .append("_UUID", "00112233445566778899aabbccddeeff");
+ byte[] bytes = new byte[30];
+ Arrays.fill(bytes, (byte) 1);
+ simpleGroup
+ .append("_UTF8", "UTF8 string" + rowKey)
+ .append("_Enum", MIN_VALUE.toString())
+ .append("_UUID", Binary.fromConstantByteArray(bytes, 0, 16))
.append("_INT32_RAW", 0x80000000)
.append("_INT_8", 0xFFFFFF80)
.append("_INT_16", 0xFFFF8000)
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index 92f3d39..e03af04 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -887,4 +887,18 @@ public class TestParquetComplex extends BaseTestQuery {
.baselineValues(2, null)
.go();
}
+
+ @Test
+ public void testNewComplexParquetReaderUUID() throws Exception {
+ String query = "select `uuid_req1`, `uuid_opt1`, `uuid_req2` from cp.`store/parquet/complex/uuid.parquet` order by `uuid_req1`, `uuid_opt1`, `uuid_req2` limit 1";
+ byte[] firstValue = {0, 39, -125, -76, -113, 95, 73, -68, -68, 61, -89, -24, 123, -40, 94, -6};
+ byte[] secondValue = {74, -38, 0, -43, -73, 101, 67, -11, -68, -17, -63, 111, -20, 70, -93, -76};
+ testBuilder()
+ .optionSettingQueriesForTestQuery("alter session set `store.parquet.use_new_reader` = false")
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("uuid_req1", "uuid_opt1", "uuid_req2")
+ .baselineValues(firstValue, null, secondValue)
+ .go();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
index 2aee104..0ac7d0a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetLogicalTypes.java
@@ -666,4 +666,35 @@ public class TestParquetLogicalTypes extends BaseTestQuery {
.build().run();
}
+ @Test
+ public void testUUID() throws Exception {
+ String query = "select `_UUID` from cp.`parquet/parquet_test_file_simple.parquet`";
+ byte[] bytes = new byte[16];
+ Arrays.fill(bytes, (byte) 1);
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("_UUID")
+ .baselineValues(bytes)
+ .baselineValues(bytes)
+ .baselineValues(bytes)
+ .go();
+ }
+
+ @Test
+ public void testNullableVarBinaryUUID() throws Exception {
+ String query = "select `opt1_p1` from cp.`parquet/uuid-simple-fixed-length-array.parquet` order by `opt1_p1` limit 4";
+ byte[] firstValue = {60, -19, 30, -38, -49, 8, 79, 38, -103, -105, -6, -36, 65, -27, -60, -91};
+ byte[] secondValue = {-13, 5, 4, -97, 62, -78, 73, 69, -115, -25, -62, 88, 116, 37, 86, -41};
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("opt1_p1")
+ .baselineValues(firstValue)
+ .baselineValues(secondValue)
+ .baselineValues(new Object[]{null})
+ .baselineValues(new Object[]{null})
+ .go();
+ }
}
diff --git a/exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet b/exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet
new file mode 100644
index 0000000..16a555d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/parquet_test_file_simple.parquet differ
diff --git a/exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet b/exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet
new file mode 100644
index 0000000..e1a793e
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquet/uuid-simple-fixed-length-array.parquet differ
diff --git a/exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet b/exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet
new file mode 100644
index 0000000..9597a9f
Binary files /dev/null and b/exec/java-exec/src/test/resources/store/parquet/complex/uuid.parquet differ
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 246bc29..f0d79f4 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -535,7 +535,7 @@
This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
</message>
- <maxsize>46300000</maxsize>
+ <maxsize>46600000</maxsize>
<minsize>15000000</minsize>
<files>
<file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
@@ -561,39 +561,6 @@
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-enforcer-plugin</artifactId>
- <executions>
- <execution>
- <id>enforce-jdbc-jar-compactness</id>
- <goals>
- <goal>enforce</goal>
- </goals>
- <phase>verify</phase>
- <configuration>
- <rules>
- <requireFilesSize>
- <message>
-
- The file drill-jdbc-all-${project.version}.jar is outside the expected size range.
-
- This is likely due to you adding new dependencies to a java-exec and not updating the excludes in this module. This is important as it minimizes the size of the dependency of Drill application users.
-
- </message>
- <maxsize>46300000</maxsize>
- <minsize>15000000</minsize>
- <files>
- <file>${project.build.directory}/drill-jdbc-all-${project.version}.jar</file>
- </files>
- </requireFilesSize>
- </rules>
- <fail>true</fail>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
diff --git a/pom.xml b/pom.xml
index 986996e..f65ea39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
<shaded.guava.version>28.2-jre</shaded.guava.version>
<guava.version>19.0</guava.version>
<forkCount>2</forkCount>
- <parquet.version>1.11.0</parquet.version>
+ <parquet.version>1.12.0</parquet.version>
<parquet.format.version>2.8.0</parquet.format.version>
<!--
For development purposes to be able to use custom Calcite versions (e.g. not present in jitpack