You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2021/09/29 08:04:44 UTC
[drill] branch master updated: DRILL-7969: Add support for reading
and writing Parquet files using Brotli, LZ4 and Zstandard codecs (#2321)
This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 3ffcf6a DRILL-7969: Add support for reading and writing Parquet files using Brotli, LZ4 and Zstandard codecs (#2321)
3ffcf6a is described below
commit 3ffcf6a3e867007a14b1b1f2e8c0cd110e56d537
Author: dzamo <91...@users.noreply.github.com>
AuthorDate: Wed Sep 29 10:03:54 2021 +0200
DRILL-7969: Add support for reading and writing Parquet files using Brotli, LZ4 and Zstandard codecs (#2321)
Adds support for all the standardised Parquet compression codecs beyond GZip
and Snappy by making use of the airlift/aircompressor library with a fallback
to parquet-mr for compression for codecs not implemented in parquet-mr.
A new, delegating CompressionCodecFactory implementation is included. This
handles the routing of (de)compression to the correct lib while having a
minimal impact on the calling code in the Parquet reading and writing parts
of the Drill codebase.
Add read/write tests and data files for new Parquet codecs.
Exclude and shade aircompressor in other modules.
Don't package or test Brotli compression except for Linux and Mac on AMD64.
---
contrib/storage-hive/core/pom.xml | 4 +
contrib/storage-hive/hive-exec-shade/pom.xml | 4 +
exec/java-exec/pom.xml | 44 ++++++
.../java/org/apache/drill/exec/ExecConstants.java | 5 +-
.../parquet/AbstractParquetScanBatchCreator.java | 11 +-
.../exec/store/parquet/ParquetRecordWriter.java | 35 +++--
.../parquet/columnreaders/AsyncPageReader.java | 63 +-------
.../store/parquet/columnreaders/PageReader.java | 19 ++-
.../parquet/columnreaders/ParquetRecordReader.java | 12 +-
.../compression/AirliftBytesInputCompressor.java | 171 +++++++++++++++++++++
.../compression/DrillCompressionCodecFactory.java | 117 ++++++++++++++
.../exec/store/parquet2/DrillParquetReader.java | 23 ++-
.../parquet/hadoop/ColumnChunkIncReadStore.java | 98 +++++++-----
.../hadoop/ParquetColumnChunkPageWriteStore.java | 13 +-
.../physical/impl/writer/TestParquetWriter.java | 73 ++++++++-
.../exec/physical/unit/MiniPlanUnitTestBase.java | 11 +-
.../store/parquet/ParquetRecordReaderTest.java | 10 +-
.../src/test/resources/supplier_brotli.parquet | Bin 0 -> 8723 bytes
.../src/test/resources/supplier_gzip.parquet | Bin 0 -> 8005 bytes
.../src/test/resources/supplier_lz4.parquet | Bin 0 -> 11330 bytes
.../src/test/resources/supplier_lzo.parquet | Bin 0 -> 10906 bytes
.../src/test/resources/supplier_snappy.parquet | Bin 10467 -> 0 bytes
.../src/test/resources/supplier_zstd.parquet | Bin 0 -> 8133 bytes
exec/jdbc-all/pom.xml | 8 +
pom.xml | 2 +
25 files changed, 578 insertions(+), 145 deletions(-)
diff --git a/contrib/storage-hive/core/pom.xml b/contrib/storage-hive/core/pom.xml
index ae44f5f..58af5ae 100644
--- a/contrib/storage-hive/core/pom.xml
+++ b/contrib/storage-hive/core/pom.xml
@@ -72,6 +72,10 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml b/contrib/storage-hive/hive-exec-shade/pom.xml
index c00cfb2..6d61f8f 100644
--- a/contrib/storage-hive/hive-exec-shade/pom.xml
+++ b/contrib/storage-hive/hive-exec-shade/pom.xml
@@ -196,6 +196,10 @@
<pattern>org.apache.commons.lang3.</pattern>
<shadedPattern>hive.org.apache.commons.lang3.</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>io.airlift.</pattern>
+ <shadedPattern>hive.io.airlift.</shadedPattern>
+ </relocation>
</relocations>
<filters>
<filter>
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2660f60..7997c37 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -259,6 +259,11 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<exclusions>
+ <exclusion>
+ <!-- zstd (de)compression of Parquet data is handled by aircompressor -->
+ <groupId>com.github.luben</groupId>
+ <artifactId>zstd-jni</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -569,6 +574,11 @@
<classifier>${netty.tcnative.classifier}</classifier>
</dependency>
<dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompressor</artifactId>
+ <version>${aircompressor.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-embedder</artifactId>
<version>${maven.version}</version>
@@ -747,6 +757,40 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <!-- Only package a Brotli codec for Linux and Mac OS X on AMD64, see PARQUET-1975 -->
+ <id>non-win-amd64</id>
+ <activation>
+ <os>
+ <arch>amd64</arch>
+ <name>!Windows</name>
+ </os>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>com.github.rdblue</groupId>
+ <artifactId>brotli-codec</artifactId>
+ <version>${brotli-codec.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <!-- A duplicate of the profile above to handle os.arch=x86_64 reported on Macs -->
+ <id>non-win-x86_64</id>
+ <activation>
+ <os>
+ <arch>x86_64</arch>
+ <name>!Windows</name>
+ </os>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>com.github.rdblue</groupId>
+ <artifactId>brotli-codec</artifactId>
+ <version>${brotli-codec.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<build>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be44596..16d21b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -358,7 +358,10 @@ public final class ExecConstants {
new OptionDescription("For internal use. Do not change."));
public static final String PARQUET_WRITER_COMPRESSION_TYPE = "store.parquet.compression";
public static final OptionValidator PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR = new EnumeratedStringValidator(
- PARQUET_WRITER_COMPRESSION_TYPE, new OptionDescription("Compression type for storing Parquet output. Allowed values: snappy, gzip, none"), "snappy", "gzip", "none");
+ PARQUET_WRITER_COMPRESSION_TYPE,
+ new OptionDescription("Compression type for storing Parquet output. Allowed values: none, brotli, gzip, lz4, lzo, snappy, zstd"),
+ "none", "brotli", "gzip", "lz4", "lzo", "snappy", "zstd"
+ );
public static final String PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING = "store.parquet.enable_dictionary_encoding";
public static final OptionValidator PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR = new BooleanValidator(
PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 9598390..ffba7f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.CommonParquetRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
@@ -49,7 +50,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
@@ -320,12 +321,18 @@ public abstract class AbstractParquetScanBatchCreator {
containsCorruptDates,
recordsToRead);
} else {
+ CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+ fs.getConf(),
+ new ParquetDirectByteBufferAllocator(oContext.getAllocator()),
+ 0
+ );
+
reader = new ParquetRecordReader(context,
rowGroup.getPath(),
rowGroup.getRowGroupIndex(),
recordsToRead,
fs,
- CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+ ccf,
footer,
rowGroupScan.getColumns(),
containsCorruptDates);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45a2c7f..53a228f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -62,7 +63,7 @@ import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -108,7 +109,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private boolean useSingleFSBlock = false;
private CompressionCodecName codec = CompressionCodecName.SNAPPY;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
- private CodecFactory codecFactory;
+ private CompressionCodecFactory codecFactory;
private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@ -136,8 +137,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException {
this.oContext = context.newOperatorContext(writer);
- this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(),
- new ParquetDirectByteBufferAllocator(oContext.getAllocator()), pageSize);
+ this.codecFactory = DrillCompressionCodecFactory.createDirectCodecFactory(
+ writer.getFormatPlugin().getFsConf(),
+ new ParquetDirectByteBufferAllocator(oContext.getAllocator()),
+ pageSize
+ );
this.partitionColumns = writer.getPartitionColumns();
this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
@@ -158,18 +162,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
String codecName = writerOptions.get(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).toLowerCase();
switch(codecName) {
- case "snappy":
- codec = CompressionCodecName.SNAPPY;
+ case "none":
+ case "uncompressed":
+ codec = CompressionCodecName.UNCOMPRESSED;
break;
- case "lzo":
- codec = CompressionCodecName.LZO;
+ case "brotli":
+ codec = CompressionCodecName.BROTLI;
break;
case "gzip":
codec = CompressionCodecName.GZIP;
break;
- case "none":
- case "uncompressed":
- codec = CompressionCodecName.UNCOMPRESSED;
+ case "lz4":
+ codec = CompressionCodecName.LZ4;
+ break;
+ case "lzo":
+ codec = CompressionCodecName.LZO;
+ break;
+ case "snappy":
+ codec = CompressionCodecName.SNAPPY;
+ break;
+ case "zstd":
+ codec = CompressionCodecName.ZSTD;
break;
default:
throw new UnsupportedOperationException(String.format("Unknown compression type: %s", codecName));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index ffb10da..8c0e85e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -37,17 +37,13 @@ import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
import org.apache.drill.exec.util.filereader.DirectBufInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DirectDecompressor;
-import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.xerial.snappy.Snappy;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
@@ -200,11 +196,13 @@ class AsyncPageReader extends PageReader {
pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
try {
timer.start();
+
CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
+ BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
- DecompressionHelper decompressionHelper = new DecompressionHelper(codecName);
- decompressionHelper.decompress(input, compressedSize, output, uncompressedSize);
+
+ decomp.decompress(input, compressedSize, output, uncompressedSize);
pageDataBuf.writerIndex(uncompressedSize);
timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
@@ -495,56 +493,5 @@ class AsyncPageReader extends PageReader {
}
return null;
}
-
- }
-
- private class DecompressionHelper {
- final CompressionCodecName codecName;
-
- public DecompressionHelper(CompressionCodecName codecName){
- this.codecName = codecName;
- }
-
- public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
- throws IOException {
- // GZip != thread_safe, so we go off and do our own thing.
- // The hadoop interface does not support ByteBuffer so we incur some
- // expensive copying.
- if (codecName == CompressionCodecName.GZIP) {
- GzipCodec codec = new GzipCodec();
- // DirectDecompressor: @see https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/compress/DirectDecompressor.html
- DirectDecompressor directDecompressor = codec.createDirectDecompressor();
- if (directDecompressor != null) {
- logger.debug("Using GZIP direct decompressor.");
- directDecompressor.decompress(input, output);
- } else {
- logger.debug("Using GZIP (in)direct decompressor.");
- Decompressor decompressor = codec.createDecompressor();
- decompressor.reset();
- byte[] inputBytes = new byte[compressedSize];
- input.position(0);
- input.get(inputBytes);
- decompressor.setInput(inputBytes, 0, inputBytes.length);
- byte[] outputBytes = new byte[uncompressedSize];
- decompressor.decompress(outputBytes, 0, uncompressedSize);
- output.clear();
- output.put(outputBytes);
- }
- } else if (codecName == CompressionCodecName.SNAPPY) {
- // For Snappy, just call the Snappy decompressor directly instead
- // of going thru the DirectDecompressor class.
- // The Snappy codec is itself thread safe, while going thru the DirectDecompressor path
- // seems to have concurrency issues.
- output.clear();
- int size = Snappy.uncompress(input, output);
- output.limit(size);
- } else {
- CodecFactory.BytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec());
- decompressor.decompress(input, compressedSize, output, uncompressedSize);
- }
- }
-
-
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 79c272a..d06b5ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -40,13 +40,14 @@ import org.apache.parquet.column.ValuesType;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.PrimitiveType;
@@ -108,7 +109,7 @@ class PageReader {
// These need to be held throughout reading of the entire column chunk
List<ByteBuf> allocatedDictionaryBuffers;
- protected final CodecFactory codecFactory;
+ protected final CompressionCodecFactory codecFactory;
protected final String fileName;
protected final ParquetReaderStats stats;
@@ -122,8 +123,8 @@ class PageReader {
PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
throws ExecutionSetupException {
this.parentColumnReader = parentStatus;
- allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
- codecFactory = parentColumnReader.parentReader.getCodecFactory();
+ this.allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
+ this.codecFactory = parentColumnReader.parentReader.getCodecFactory();
this.stats = parentColumnReader.parentReader.parquetReaderStats;
this.fileName = path.toString();
debugName = new StringBuilder()
@@ -239,9 +240,13 @@ class PageReader {
this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
start = dataReader.getPos();
timer.start();
- codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec())
- .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
- pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
+
+ CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
+ BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+ ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
+ ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+
+ decomp.decompress(input, compressedSize, output, uncompressedSize);
pageDataBuf.writerIndex(uncompressedSize);
timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 3fbe83c..b9c365c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContex
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
private final FileSystem fileSystem;
private final long numRecordsToRead; // number of records to read
private final Path hadoopPath;
- private final CodecFactory codecFactory;
+ private final CompressionCodecFactory codecFactory;
private final int rowGroupIndex;
private final ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
@@ -77,7 +77,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
int rowGroupIndex,
long numRecordsToRead,
FileSystem fs,
- CodecFactory codecFactory,
+ CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
@@ -88,7 +88,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
Path path,
int rowGroupIndex,
FileSystem fs,
- CodecFactory codecFactory,
+ CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
@@ -102,7 +102,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
Path path,
int rowGroupIndex,
FileSystem fs,
- CodecFactory codecFactory,
+ CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
@@ -135,7 +135,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
return dateCorruptionStatus;
}
- public CodecFactory getCodecFactory() {
+ public CompressionCodecFactory getCodecFactory() {
return codecFactory;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
new file mode 100644
index 0000000..9170e81
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Stack;
+
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+
+/**
+ * A shim making an aircompressor (de)compressor available through the BytesInputCompressor
+ * and BytesInputDecompressor interfaces.
+ */
+public class AirliftBytesInputCompressor implements CompressionCodecFactory.BytesInputCompressor, CompressionCodecFactory.BytesInputDecompressor {
+ private static final Logger logger = LoggerFactory.getLogger(AirliftBytesInputCompressor.class);
+
+ // name of the codec provided by this compressor
+ private CompressionCodecName codecName;
+
+ // backing aircompressor compressor
+ private Compressor airComp;
+
+ // backing aircompressor decompressor
+ private Decompressor airDecomp;
+
+ // the direct memory allocator to be used for (de)compression outputs
+ private ByteBufferAllocator allocator;
+
+ // all the direct memory buffers we've allocated, and must release
+ private Stack<ByteBuffer> allocatedBuffers;
+
+ public AirliftBytesInputCompressor(CompressionCodecName codecName, ByteBufferAllocator allocator) {
+ this.codecName = codecName;
+
+ switch (codecName) {
+ case LZ4:
+ airComp = new Lz4Compressor();
+ airDecomp = new Lz4Decompressor();
+ break;
+ case LZO:
+ airComp = new LzoCompressor();
+ airDecomp = new LzoDecompressor();
+ break;
+ case SNAPPY:
+ airComp = new SnappyCompressor();
+ airDecomp = new SnappyDecompressor();
+ break;
+ case ZSTD:
+ airComp = new ZstdCompressor();
+ airDecomp = new ZstdDecompressor();
+ break;
+ default:
+ throw new UnsupportedOperationException("Parquet compression codec is not supported: " + codecName);
+ }
+
+ this.allocator = allocator;
+ this.allocatedBuffers = new Stack<>();
+
+ logger.debug(
+ "constructed a {} using a backing compressor of {}",
+ getClass().getName(),
+ airComp.getClass().getName()
+ );
+ }
+
+ @Override
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ ByteBuffer inBuf = bytes.toByteBuffer();
+
+ logger.trace(
+ "will use aircompressor to compress {} bytes from a {} containing a {}",
+ bytes.size(),
+ bytes.getClass().getName(),
+ inBuf.getClass().getName()
+ );
+
+ // aircompressor tells us the maximum amount of output buffer we could need
+ int maxOutLen = airComp.maxCompressedLength((int) bytes.size());
+ ByteBuffer outBuf = allocator.allocate(maxOutLen);
+ // track our allocation for later release in release()
+ this.allocatedBuffers.push(outBuf);
+
+ airComp.compress(inBuf, outBuf);
+
+ // flip: callers expect the output buffer positioned at the start of data
+ return BytesInput.from((ByteBuffer) outBuf.flip());
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return codecName;
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ ByteBuffer inBuf = bytes.toByteBuffer();
+
+ logger.trace(
+ "will use aircompressor to decompress {} bytes from a {} containing a {}.",
+ uncompressedSize,
+ bytes.getClass().getName(),
+ inBuf.getClass().getName()
+ );
+
+ ByteBuffer outBuf = allocator.allocate(uncompressedSize);
+ // track our allocation for later release in release()
+ this.allocatedBuffers.push(outBuf);
+
+ airDecomp.decompress(inBuf, outBuf);
+
+ // flip: callers expect the output buffer positioned at the start of data
+ return BytesInput.from((ByteBuffer) outBuf.flip());
+ }
+
+ @Override
+ public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+ throws IOException {
+ logger.trace(
+ "will use aircompressor to decompress {} bytes from a {} to a {}.",
+ uncompressedSize,
+ input.getClass().getName(),
+ output.getClass().getName()
+ );
+
+ airDecomp.decompress(input, output);
+ }
+
+ @Override
+ public void release() {
+ logger.debug(
+ "will release {} allocated buffers.",
+ this.allocatedBuffers.size()
+ );
+
+ while (!this.allocatedBuffers.isEmpty()) {
+ this.allocator.release(allocatedBuffers.pop());
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
new file mode 100644
index 0000000..53eee64
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A delegating compression codec factory that returns (de)compressors based on
+ * https://github.com/airlift/aircompressor when possible and falls back to
+ * parquet-mr otherwise. The aircompressor lib was introduced into Drill
+ * because of difficulties encountered with the JNI-based implementations of
+ * lzo, lz4 and zstd in parquet-mr.
+ *
+ * By modifying the constant AIRCOMPRESSOR_CODECS it is possible to choose
+ * which codecs should be routed to which lib. In addition, this class
+ * implements parquet-mr's CompressionCodecFactory interface meaning that
+ * swapping this factory for e.g. one in parquet-mr will have minimal impact
+ * on code in Drill relying on a CompressCodecFactory.
+ *
+ */
+public class DrillCompressionCodecFactory implements CompressionCodecFactory {
+ private static final Logger logger = LoggerFactory.getLogger(DrillCompressionCodecFactory.class);
+
+ // The set of codecs to be handled by aircompressor
+ private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
+ Arrays.asList(CompressionCodecName.LZ4, CompressionCodecName.LZO,
+ CompressionCodecName.SNAPPY, CompressionCodecName.ZSTD));
+
+ // pool of reused aircompressor compressors (parquet-mr's factory has its own)
+ private final Map<CompressionCodecName, AirliftBytesInputCompressor> airCompressors = new HashMap<>();
+
+ // fallback parquet-mr compression codec factory
+ private CompressionCodecFactory parqCodecFactory;
+
+ // direct memory allocator to be used during (de)compression
+ private ByteBufferAllocator allocator;
+
+ // static builder method, solely to mimick the parquet-mr API as closely as possible
+ public static CompressionCodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator,
+ int pageSize) {
+ return new DrillCompressionCodecFactory(config, allocator, pageSize);
+ }
+
+ public DrillCompressionCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+ this.allocator = allocator;
+ this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+
+ logger.debug(
+ "constructed a {} using a fallback factory of {}",
+ getClass().getName(),
+ parqCodecFactory.getClass().getName()
+ );
+ }
+
+ @Override
+ public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
+ if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
+ return airCompressors.computeIfAbsent(
+ codecName,
+ c -> new AirliftBytesInputCompressor(codecName, allocator)
+ );
+ } else {
+ return parqCodecFactory.getCompressor(codecName);
+ }
+ }
+
+ @Override
+ public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
+ if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
+ return airCompressors.computeIfAbsent(
+ codecName,
+ c -> new AirliftBytesInputCompressor(codecName, allocator)
+ );
+ } else {
+ return parqCodecFactory.getDecompressor(codecName);
+ }
+ }
+
+ @Override
+ public void release() {
+ parqCodecFactory.release();
+ logger.debug("released {}", parqCodecFactory);
+
+ for (AirliftBytesInputCompressor abic : airCompressors.values()) {
+ abic.release();
+ logger.debug("released {}", abic);
+ }
+ airCompressors.clear();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 1c0490c..8fe61aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -32,12 +33,13 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ColumnChunkIncReadStore;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -331,10 +333,21 @@ public class DrillParquetReader extends CommonParquetRecordReader {
Function.identity(),
(o, n) -> n));
- pageReadStore = new ColumnChunkIncReadStore(numRecordsToRead,
- CodecFactory.createDirectCodecFactory(drillFileSystem.getConf(),
- new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0),
- operatorContext.getAllocator(), drillFileSystem, entry.getPath());
+ BufferAllocator allocator = operatorContext.getAllocator();
+
+ CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+ drillFileSystem.getConf(),
+ new ParquetDirectByteBufferAllocator(allocator),
+ 0
+ );
+
+ pageReadStore = new ColumnChunkIncReadStore(
+ numRecordsToRead,
+ ccf,
+ allocator,
+ drillFileSystem,
+ entry.getPath()
+ );
for (String[] path : schema.getPaths()) {
Type type = schema.getType(path);
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 4bb1a22..c0d6a88 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -40,11 +40,12 @@ import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.HadoopStreams;
@@ -55,14 +56,14 @@ public class ColumnChunkIncReadStore implements PageReadStore {
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private CodecFactory codecFactory;
+ private CompressionCodecFactory codecFactory;
private BufferAllocator allocator;
private FileSystem fs;
private Path path;
private long rowCount;
private List<FSDataInputStream> streams = new ArrayList<>();
- public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator,
+ public ColumnChunkIncReadStore(long rowCount, CompressionCodecFactory codecFactory, BufferAllocator allocator,
FileSystem fs, Path path) {
this.codecFactory = codecFactory;
this.allocator = allocator;
@@ -82,7 +83,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
private DictionaryPage dictionaryPage;
private FSDataInputStream in;
- private BytesDecompressor decompressor;
+ private BytesInputDecompressor decompressor;
private ByteBuf lastPage;
@@ -100,19 +101,26 @@ public class ColumnChunkIncReadStore implements PageReadStore {
if (dictionaryPage == null) {
PageHeader pageHeader = new PageHeader();
long pos = 0;
+
try {
pos = in.getPos();
pageHeader = Util.readPageHeader(in);
+
if (pageHeader.getDictionary_page_header() == null) {
in.seek(pos);
return null;
}
- dictionaryPage =
- new DictionaryPage(
- decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
- pageHeader.getDictionary_page_header().getNum_values(),
- parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
- );
+
+ BytesInput dictPageBytes = decompressor.decompress(
+ BytesInput.from(in, pageHeader.compressed_page_size),
+ pageHeader.getUncompressed_page_size()
+ );
+
+ dictionaryPage = new DictionaryPage(
+ dictPageBytes,
+ pageHeader.getDictionary_page_header().getNum_values(),
+ parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+ );
} catch (Exception e) {
throw new DrillRuntimeException("Error reading dictionary page." +
"\nFile path: " + path.toUri().getPath() +
@@ -145,15 +153,20 @@ public class ColumnChunkIncReadStore implements PageReadStore {
pageHeader = Util.readPageHeader(in);
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
+
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage == null) {
- dictionaryPage =
- new DictionaryPage(
- decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
- pageHeader.uncompressed_page_size,
- parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
- );
+ BytesInput pageBytes = decompressor.decompress(
+ BytesInput.from(in, pageHeader.compressed_page_size),
+ pageHeader.getUncompressed_page_size()
+ );
+
+ dictionaryPage = new DictionaryPage(
+ pageBytes,
+ pageHeader.uncompressed_page_size,
+ parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+ );
} else {
in.skip(pageHeader.compressed_page_size);
}
@@ -165,14 +178,20 @@ public class ColumnChunkIncReadStore implements PageReadStore {
ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
HadoopStreams.wrap(in).readFully(buffer);
buffer.flip();
+
+ BytesInput pageBytes = decompressor.decompress(
+ BytesInput.from(buffer),
+ pageHeader.getUncompressed_page_size()
+ );
+
return new DataPageV1(
- decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()),
- pageHeader.data_page_header.num_values,
- pageHeader.uncompressed_page_size,
- fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
- parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
- parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
- parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+ pageBytes,
+ pageHeader.data_page_header.num_values,
+ pageHeader.uncompressed_page_size,
+ fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+ parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+ parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+ parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
);
// TODO - finish testing this with more files
case DATA_PAGE_V2:
@@ -184,10 +203,11 @@ public class ColumnChunkIncReadStore implements PageReadStore {
buffer.flip();
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
- BytesInput decompressedPageData =
- decompressor.decompress(
- BytesInput.from(buffer),
- pageHeader.uncompressed_page_size);
+ BytesInput decompressedPageData = decompressor.decompress(
+ BytesInput.from(buffer),
+ pageHeader.uncompressed_page_size
+ );
+
ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
int limit = byteBuffer.limit();
byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
@@ -200,17 +220,17 @@ public class ColumnChunkIncReadStore implements PageReadStore {
BytesInput data = BytesInput.from(byteBuffer.slice());
return new DataPageV2(
- dataHeaderV2.getNum_rows(),
- dataHeaderV2.getNum_nulls(),
- dataHeaderV2.getNum_values(),
- repetitionLevels,
- definitionLevels,
- parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
- data,
- uncompressedPageSize,
- fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
- dataHeaderV2.isIs_compressed()
- );
+ dataHeaderV2.getNum_rows(),
+ dataHeaderV2.getNum_nulls(),
+ dataHeaderV2.getNum_values(),
+ repetitionLevels,
+ definitionLevels,
+ parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
+ data,
+ uncompressedPageSize,
+ fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
+ dataHeaderV2.isIs_compressed()
+ );
default:
in.skip(pageHeader.compressed_page_size);
break;
@@ -233,6 +253,8 @@ public class ColumnChunkIncReadStore implements PageReadStore {
}
void close() {
+ decompressor.release();
+
if (lastPage != null) {
lastPage.release();
lastPage = null;
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 790e3c3..31a304e 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -39,13 +39,13 @@ import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
import org.apache.parquet.crypto.InternalFileEncryptor;
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
@@ -65,8 +65,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter, Closeable {
private final ColumnDescriptor path;
- private final BytesCompressor compressor;
-
+ private final BytesInputCompressor compressor;
private final CapacityByteArrayOutputStream tempOutputStream;
private final CapacityByteArrayOutputStream buf;
private DictionaryPage dictionaryPage;
@@ -100,7 +99,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
private final byte[] fileAAD;
private ColumnChunkPageWriter(ColumnDescriptor path,
- BytesCompressor compressor,
+ BytesInputCompressor compressor,
int initialSlabSize,
int maxCapacityHint,
ByteBufferAllocator allocator,
@@ -402,14 +401,14 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;
- public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+ public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize,
int maxCapacityHint, ByteBufferAllocator allocator,
int columnIndexTruncateLength) {
this(compressor, schema, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
- public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+ public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize,
int maxCapacityHint, ByteBufferAllocator allocator,
int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
this.schema = schema;
@@ -419,7 +418,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
}
}
- public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+ public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize,
int maxCapacityHint, ByteBufferAllocator allocator,
int columnIndexTruncateLength, boolean pageWriteChecksumEnabled,
InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 170bf3d..dbae776 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -47,6 +47,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import java.io.File;
import java.io.FileWriter;
@@ -967,6 +969,21 @@ public class TestParquetWriter extends BaseTestQuery {
for (int i = 1; i <= repeat; i++) {
testTPCHReadWriteGzip();
testTPCHReadWriteSnappy();
+ testTPCHReadWriteBrotli();
+ testTPCHReadWriteLz4();
+ testTPCHReadWriteLzo();
+ testTPCHReadWriteZstd();
+ }
+ }
+
+ @Test
+ public void testTPCHReadWriteSnappy() throws Exception {
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy");
+ String inputTable = "cp.`tpch/supplier.parquet`";
+ runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
}
@@ -974,19 +991,65 @@ public class TestParquetWriter extends BaseTestQuery {
public void testTPCHReadWriteGzip() throws Exception {
try {
alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip");
- String inputTable = "cp.`tpch/supplier.parquet`";
+ String inputTable = "cp.`supplier_gzip.parquet`";
runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip");
} finally {
resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
}
+ // We currently bundle the JNI-based com.rdblue.brotli-codec and it only provides
+ // natives for Mac and Linux on AMD64. See PARQUET-1975.
@Test
- public void testTPCHReadWriteSnappy() throws Exception {
+ @DisabledIfSystemProperty(named = "os.name", matches = "Windows")
+ @EnabledIfSystemProperty(named = "os.arch", matches = "amd64") // reported for Linux on AMD64
+ @EnabledIfSystemProperty(named = "os.arch", matches = "x86_64") // reported for OS X on AMD64
+ public void testTPCHReadWriteBrotli() throws Exception {
try {
- alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy");
- String inputTable = "cp.`supplier_snappy.parquet`";
- runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
+ alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "brotli");
+ // exercise the new Parquet record reader with this parquet-mr-backed codec
+ alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ String inputTable = "cp.`supplier_brotli.parquet`";
+ runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_brotli");
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ }
+ }
+
+ @Test
+ public void testTPCHReadWriteLz4() throws Exception {
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lz4");
+ // exercise the async Parquet column reader with this aircompressor-backed codec
+ alterSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC, true);
+ String inputTable = "cp.`supplier_lz4.parquet`";
+ runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lz4");
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ }
+ }
+
+ @Test
+ public void testTPCHReadWriteLzo() throws Exception {
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lzo");
+ // exercise the async Parquet page reader with this aircompressor-backed codec
+ alterSession(ExecConstants.PARQUET_PAGEREADER_ASYNC, true);
+ String inputTable = "cp.`supplier_lzo.parquet`";
+ runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lzo");
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+ }
+ }
+
+ @Test
+ public void testTPCHReadWriteZstd() throws Exception {
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "zstd");
+ // exercise the new Parquet record reader with this aircompressor-backed codec
+ alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+ String inputTable = "cp.`supplier_zstd.parquet`";
+ runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_zstd");
} finally {
resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index ca948f9..3ea25b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,10 +32,11 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.drill.test.LegacyOperatorTestBuilder;
import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -447,12 +448,16 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), path);
for (int i = 0; i < footer.getBlocks().size(); i++) {
+ CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+ fs.getConf(),
+ new ParquetDirectByteBufferAllocator(opContext.getAllocator()),
+ 0
+ );
readers.add(new ParquetRecordReader(fragContext,
path,
i,
fs,
- CodecFactory.createDirectCodecFactory(fs.getConf(),
- new ParquetDirectByteBufferAllocator(opContext.getAllocator()), 0),
+ ccf,
footer,
columnsToRead,
ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 16b25ce..4526381 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -66,7 +66,8 @@ import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.Footer;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -632,8 +633,13 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
final FileSystem fs = new CachedSingleFileSystem(fileName);
final BufferAllocator allocator = RootAllocatorFactory.newRoot(c);
for(int i = 0; i < 25; i++) {
+ CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+ dfsConfig,
+ new ParquetDirectByteBufferAllocator(allocator),
+ 0
+ );
final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs,
- CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0),
+ ccf,
f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION);
final TestOutputMutator mutator = new TestOutputMutator(allocator);
rr.setup(null, mutator);
diff --git a/exec/java-exec/src/test/resources/supplier_brotli.parquet b/exec/java-exec/src/test/resources/supplier_brotli.parquet
new file mode 100644
index 0000000..5ae2721
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_brotli.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_gzip.parquet b/exec/java-exec/src/test/resources/supplier_gzip.parquet
new file mode 100644
index 0000000..e57bf05
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_gzip.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_lz4.parquet b/exec/java-exec/src/test/resources/supplier_lz4.parquet
new file mode 100644
index 0000000..1200590
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_lz4.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_lzo.parquet b/exec/java-exec/src/test/resources/supplier_lzo.parquet
new file mode 100644
index 0000000..e05283d
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_lzo.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_snappy.parquet b/exec/java-exec/src/test/resources/supplier_snappy.parquet
deleted file mode 100644
index 5a01d9a..0000000
Binary files a/exec/java-exec/src/test/resources/supplier_snappy.parquet and /dev/null differ
diff --git a/exec/java-exec/src/test/resources/supplier_zstd.parquet b/exec/java-exec/src/test/resources/supplier_zstd.parquet
new file mode 100644
index 0000000..89cb283
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_zstd.parquet differ
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index a69c4aa..17a5182 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -188,6 +188,14 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.airlift</groupId>
+ <artifactId>aircompresssor</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.github.rdblue</groupId>
+ <artifactId>brotli-codec</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/pom.xml b/pom.xml
index ed2968b..31bbc9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,8 @@
<httpdlog-parser.version>5.7</httpdlog-parser.version>
<yauaa.version>5.20</yauaa.version>
<lombok.version>1.18.20</lombok.version>
+ <brotli-codec.version>0.1.1</brotli-codec.version>
+ <aircompressor.version>0.20</aircompressor.version>
</properties>
<scm>