You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by dz...@apache.org on 2021/09/29 08:04:44 UTC

[drill] branch master updated: DRILL-7969: Add support for reading and writing Parquet files using Brotli, LZ4 and Zstandard codecs (#2321)

This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ffcf6a  DRILL-7969: Add support for reading and writing Parquet files using Brotli, LZ4 and Zstandard codecs (#2321)
3ffcf6a is described below

commit 3ffcf6a3e867007a14b1b1f2e8c0cd110e56d537
Author: dzamo <91...@users.noreply.github.com>
AuthorDate: Wed Sep 29 10:03:54 2021 +0200

    DRILL-7969: Add support for reading and writing Parquet files using Brotli, LZ4 and Zstandard codecs (#2321)
    
    Adds support for all the standardised Parquet compression codecs beyond GZip
    and Snappy by making use of the airlift/aircompressor library with a fallback
    to parquet-mr for compression for codecs not implemented in parquet-mr.
    
    A new, delegating CompressionCodecFactory implementation is included.  This
    handles the routing of (de)compression to the correct lib while having a
    minimal impact on the calling code in the Parquet reading and writing parts
    of the Drill codebase.
    
    Add read/write tests and data files for new Parquet codecs.
    
    Exclude and shade aircompressor in other modules.
    
    Don't package or test Brotli compression except for Linux and Mac on AMD64.
---
 contrib/storage-hive/core/pom.xml                  |   4 +
 contrib/storage-hive/hive-exec-shade/pom.xml       |   4 +
 exec/java-exec/pom.xml                             |  44 ++++++
 .../java/org/apache/drill/exec/ExecConstants.java  |   5 +-
 .../parquet/AbstractParquetScanBatchCreator.java   |  11 +-
 .../exec/store/parquet/ParquetRecordWriter.java    |  35 +++--
 .../parquet/columnreaders/AsyncPageReader.java     |  63 +-------
 .../store/parquet/columnreaders/PageReader.java    |  19 ++-
 .../parquet/columnreaders/ParquetRecordReader.java |  12 +-
 .../compression/AirliftBytesInputCompressor.java   | 171 +++++++++++++++++++++
 .../compression/DrillCompressionCodecFactory.java  | 117 ++++++++++++++
 .../exec/store/parquet2/DrillParquetReader.java    |  23 ++-
 .../parquet/hadoop/ColumnChunkIncReadStore.java    |  98 +++++++-----
 .../hadoop/ParquetColumnChunkPageWriteStore.java   |  13 +-
 .../physical/impl/writer/TestParquetWriter.java    |  73 ++++++++-
 .../exec/physical/unit/MiniPlanUnitTestBase.java   |  11 +-
 .../store/parquet/ParquetRecordReaderTest.java     |  10 +-
 .../src/test/resources/supplier_brotli.parquet     | Bin 0 -> 8723 bytes
 .../src/test/resources/supplier_gzip.parquet       | Bin 0 -> 8005 bytes
 .../src/test/resources/supplier_lz4.parquet        | Bin 0 -> 11330 bytes
 .../src/test/resources/supplier_lzo.parquet        | Bin 0 -> 10906 bytes
 .../src/test/resources/supplier_snappy.parquet     | Bin 10467 -> 0 bytes
 .../src/test/resources/supplier_zstd.parquet       | Bin 0 -> 8133 bytes
 exec/jdbc-all/pom.xml                              |   8 +
 pom.xml                                            |   2 +
 25 files changed, 578 insertions(+), 145 deletions(-)

diff --git a/contrib/storage-hive/core/pom.xml b/contrib/storage-hive/core/pom.xml
index ae44f5f..58af5ae 100644
--- a/contrib/storage-hive/core/pom.xml
+++ b/contrib/storage-hive/core/pom.xml
@@ -72,6 +72,10 @@
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-1.2-api</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>io.airlift</groupId>
+          <artifactId>aircompressor</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
diff --git a/contrib/storage-hive/hive-exec-shade/pom.xml b/contrib/storage-hive/hive-exec-shade/pom.xml
index c00cfb2..6d61f8f 100644
--- a/contrib/storage-hive/hive-exec-shade/pom.xml
+++ b/contrib/storage-hive/hive-exec-shade/pom.xml
@@ -196,6 +196,10 @@
               <pattern>org.apache.commons.lang3.</pattern>
               <shadedPattern>hive.org.apache.commons.lang3.</shadedPattern>
             </relocation>
+            <relocation>
+              <pattern>io.airlift.</pattern>
+              <shadedPattern>hive.io.airlift.</shadedPattern>
+            </relocation>
           </relocations>
           <filters>
             <filter>
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 2660f60..7997c37 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -259,6 +259,11 @@
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-hadoop</artifactId>
       <exclusions>
+        <exclusion>
+          <!-- zstd (de)compression of Parquet data is handled by aircompressor -->
+          <groupId>com.github.luben</groupId>
+          <artifactId>zstd-jni</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -569,6 +574,11 @@
       <classifier>${netty.tcnative.classifier}</classifier>
     </dependency>
     <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>aircompressor</artifactId>
+      <version>${aircompressor.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.maven</groupId>
       <artifactId>maven-embedder</artifactId>
       <version>${maven.version}</version>
@@ -747,6 +757,40 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <!-- Only package a Brotli codec for Linux and Mac OS X on AMD64, see PARQUET-1975 -->
+      <id>non-win-amd64</id>
+      <activation>
+        <os>
+          <arch>amd64</arch>
+          <name>!Windows</name>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>com.github.rdblue</groupId>
+          <artifactId>brotli-codec</artifactId>
+          <version>${brotli-codec.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <!-- A duplicate of the profile above to handle os.arch=x86_64 reported on Macs -->
+      <id>non-win-x86_64</id>
+      <activation>
+        <os>
+          <arch>x86_64</arch>
+          <name>!Windows</name>
+        </os>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>com.github.rdblue</groupId>
+          <artifactId>brotli-codec</artifactId>
+          <version>${brotli-codec.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 
   <build>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be44596..16d21b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -358,7 +358,10 @@ public final class ExecConstants {
       new OptionDescription("For internal use. Do not change."));
   public static final String PARQUET_WRITER_COMPRESSION_TYPE = "store.parquet.compression";
   public static final OptionValidator PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR = new EnumeratedStringValidator(
-      PARQUET_WRITER_COMPRESSION_TYPE, new OptionDescription("Compression type for storing Parquet output. Allowed values: snappy, gzip, none"), "snappy", "gzip", "none");
+      PARQUET_WRITER_COMPRESSION_TYPE,
+      new OptionDescription("Compression type for storing Parquet output. Allowed values: none, brotli, gzip, lz4, lzo, snappy, zstd"),
+      "none", "brotli", "gzip", "lz4", "lzo", "snappy", "zstd"
+  );
   public static final String PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING = "store.parquet.enable_dictionary_encoding";
   public static final OptionValidator PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR = new BooleanValidator(
       PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 9598390..ffba7f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.store.ColumnExplorer;
 import org.apache.drill.exec.store.CommonParquetRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
 import org.apache.drill.exec.store.parquet.metadata.Metadata_V4;
@@ -49,7 +50,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
@@ -320,12 +321,18 @@ public abstract class AbstractParquetScanBatchCreator {
         containsCorruptDates,
         recordsToRead);
     } else {
+      CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+          fs.getConf(),
+          new ParquetDirectByteBufferAllocator(oContext.getAllocator()),
+          0
+      );
+
       reader = new ParquetRecordReader(context,
         rowGroup.getPath(),
         rowGroup.getRowGroupIndex(),
         recordsToRead,
         fs,
-        CodecFactory.createDirectCodecFactory(fs.getConf(), new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
+        ccf,
         footer,
         rowGroupScan.getColumns(),
         containsCorruptDates);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 45a2c7f..53a228f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.store.StorageStrategy;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
 import org.apache.drill.exec.planner.physical.WriterPrel;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
@@ -62,7 +63,7 @@ import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -108,7 +109,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private boolean useSingleFSBlock = false;
   private CompressionCodecName codec = CompressionCodecName.SNAPPY;
   private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
-  private CodecFactory codecFactory;
+  private CompressionCodecFactory codecFactory;
 
   private long recordCount = 0;
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@ -136,8 +137,11 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException {
     this.oContext = context.newOperatorContext(writer);
-    this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(),
-        new ParquetDirectByteBufferAllocator(oContext.getAllocator()), pageSize);
+    this.codecFactory = DrillCompressionCodecFactory.createDirectCodecFactory(
+        writer.getFormatPlugin().getFsConf(),
+        new ParquetDirectByteBufferAllocator(oContext.getAllocator()),
+        pageSize
+    );
     this.partitionColumns = writer.getPartitionColumns();
     this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
     this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
@@ -158,18 +162,27 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
     String codecName = writerOptions.get(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).toLowerCase();
     switch(codecName) {
-    case "snappy":
-      codec = CompressionCodecName.SNAPPY;
+    case "none":
+    case "uncompressed":
+      codec = CompressionCodecName.UNCOMPRESSED;
       break;
-    case "lzo":
-      codec = CompressionCodecName.LZO;
+    case "brotli":
+      codec = CompressionCodecName.BROTLI;
       break;
     case "gzip":
       codec = CompressionCodecName.GZIP;
       break;
-    case "none":
-    case "uncompressed":
-      codec = CompressionCodecName.UNCOMPRESSED;
+    case "lz4":
+      codec = CompressionCodecName.LZ4;
+      break;
+    case "lzo":
+      codec = CompressionCodecName.LZO;
+      break;
+    case "snappy":
+      codec = CompressionCodecName.SNAPPY;
+      break;
+    case "zstd":
+      codec = CompressionCodecName.ZSTD;
       break;
     default:
       throw new UnsupportedOperationException(String.format("Unknown compression type: %s", codecName));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index ffb10da..8c0e85e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -37,17 +37,13 @@ import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
 import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DirectDecompressor;
-import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.PageType;
 import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.xerial.snappy.Snappy;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
@@ -200,11 +196,13 @@ class AsyncPageReader extends PageReader {
     pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
     try {
       timer.start();
+
       CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
+      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
       ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
       ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
-      DecompressionHelper decompressionHelper = new DecompressionHelper(codecName);
-      decompressionHelper.decompress(input, compressedSize, output, uncompressedSize);
+
+      decomp.decompress(input, compressedSize, output, uncompressedSize);
       pageDataBuf.writerIndex(uncompressedSize);
       timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
@@ -495,56 +493,5 @@ class AsyncPageReader extends PageReader {
     }
       return null;
     }
-
-  }
-
-  private class DecompressionHelper {
-    final CompressionCodecName codecName;
-
-    public DecompressionHelper(CompressionCodecName codecName){
-      this.codecName = codecName;
-    }
-
-    public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
-        throws IOException {
-      // GZip != thread_safe, so we go off and do our own thing.
-      // The hadoop interface does not support ByteBuffer so we incur some
-      // expensive copying.
-      if (codecName == CompressionCodecName.GZIP) {
-        GzipCodec codec = new GzipCodec();
-        // DirectDecompressor: @see https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/compress/DirectDecompressor.html
-        DirectDecompressor directDecompressor = codec.createDirectDecompressor();
-        if (directDecompressor != null) {
-          logger.debug("Using GZIP direct decompressor.");
-          directDecompressor.decompress(input, output);
-        } else {
-          logger.debug("Using GZIP (in)direct decompressor.");
-          Decompressor decompressor = codec.createDecompressor();
-          decompressor.reset();
-          byte[] inputBytes = new byte[compressedSize];
-          input.position(0);
-          input.get(inputBytes);
-          decompressor.setInput(inputBytes, 0, inputBytes.length);
-          byte[] outputBytes = new byte[uncompressedSize];
-          decompressor.decompress(outputBytes, 0, uncompressedSize);
-          output.clear();
-          output.put(outputBytes);
-        }
-      } else if (codecName == CompressionCodecName.SNAPPY) {
-        // For Snappy, just call the Snappy decompressor directly instead
-        // of going thru the DirectDecompressor class.
-        // The Snappy codec is itself thread safe, while going thru the DirectDecompressor path
-        // seems to have concurrency issues.
-        output.clear();
-        int size = Snappy.uncompress(input, output);
-        output.limit(size);
-      } else {
-        CodecFactory.BytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec());
-        decompressor.decompress(input, compressedSize, output, uncompressedSize);
-      }
-    }
-
-
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 79c272a..d06b5ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -40,13 +40,14 @@ import org.apache.parquet.column.ValuesType;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.format.DataPageHeader;
 import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.PageType;
 import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.PrimitiveType;
@@ -108,7 +109,7 @@ class PageReader {
   // These need to be held throughout reading of the entire column chunk
   List<ByteBuf> allocatedDictionaryBuffers;
 
-  protected final CodecFactory codecFactory;
+  protected final CompressionCodecFactory codecFactory;
   protected final String fileName;
 
   protected final ParquetReaderStats stats;
@@ -122,8 +123,8 @@ class PageReader {
   PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException {
     this.parentColumnReader = parentStatus;
-    allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
-    codecFactory = parentColumnReader.parentReader.getCodecFactory();
+    this.allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
+    this.codecFactory = parentColumnReader.parentReader.getCodecFactory();
     this.stats = parentColumnReader.parentReader.parquetReaderStats;
     this.fileName = path.toString();
     debugName = new StringBuilder()
@@ -239,9 +240,13 @@ class PageReader {
         this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
         start = dataReader.getPos();
         timer.start();
-        codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec())
-            .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
-                pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
+
+        CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
+        BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+        ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
+        ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+
+        decomp.decompress(input, compressedSize, output, uncompressedSize);
         pageDataBuf.writerIndex(uncompressedSize);
         timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
         this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 3fbe83c..b9c365c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContex
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
   private final FileSystem fileSystem;
   private final long numRecordsToRead; // number of records to read
   private final Path hadoopPath;
-  private final CodecFactory codecFactory;
+  private final CompressionCodecFactory codecFactory;
   private final int rowGroupIndex;
   private final ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
@@ -77,7 +77,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
       int rowGroupIndex,
       long numRecordsToRead,
       FileSystem fs,
-      CodecFactory codecFactory,
+      CompressionCodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
@@ -88,7 +88,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
       Path path,
       int rowGroupIndex,
       FileSystem fs,
-      CodecFactory codecFactory,
+      CompressionCodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
@@ -102,7 +102,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
       Path path,
       int rowGroupIndex,
       FileSystem fs,
-      CodecFactory codecFactory,
+      CompressionCodecFactory codecFactory,
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
@@ -135,7 +135,7 @@ public class ParquetRecordReader extends CommonParquetRecordReader {
     return dateCorruptionStatus;
   }
 
-  public CodecFactory getCodecFactory() {
+  public CompressionCodecFactory getCodecFactory() {
     return codecFactory;
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
new file mode 100644
index 0000000..9170e81
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/AirliftBytesInputCompressor.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Stack;
+
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+
+/**
+ * A shim making an aircompressor (de)compressor available through the BytesInputCompressor
+ * and BytesInputDecompressor interfaces.
+ */
+public class AirliftBytesInputCompressor implements CompressionCodecFactory.BytesInputCompressor, CompressionCodecFactory.BytesInputDecompressor {
+  private static final Logger logger = LoggerFactory.getLogger(AirliftBytesInputCompressor.class);
+
+  // name of the codec provided by this compressor
+  private CompressionCodecName codecName;
+
+  // backing aircompressor compressor
+  private Compressor airComp;
+
+  // backing aircompressor decompressor
+  private Decompressor airDecomp;
+
+  // the direct memory allocator to be used for (de)compression outputs
+  private ByteBufferAllocator allocator;
+
+  // all the direct memory buffers we've allocated, and must release
+  private Stack<ByteBuffer> allocatedBuffers;
+
+  public AirliftBytesInputCompressor(CompressionCodecName codecName, ByteBufferAllocator allocator) {
+    this.codecName = codecName;
+
+    switch (codecName) {
+    case LZ4:
+      airComp = new Lz4Compressor();
+      airDecomp = new Lz4Decompressor();
+      break;
+    case LZO:
+      airComp = new LzoCompressor();
+      airDecomp = new LzoDecompressor();
+      break;
+    case SNAPPY:
+      airComp = new SnappyCompressor();
+      airDecomp = new SnappyDecompressor();
+      break;
+    case ZSTD:
+      airComp = new ZstdCompressor();
+      airDecomp = new ZstdDecompressor();
+      break;
+    default:
+      throw new UnsupportedOperationException("Parquet compression codec is not supported: " + codecName);
+    }
+
+    this.allocator = allocator;
+    this.allocatedBuffers = new Stack<>();
+
+    logger.debug(
+        "constructed a {} using a backing compressor of {}",
+        getClass().getName(),
+        airComp.getClass().getName()
+    );
+  }
+
+  @Override
+  public BytesInput compress(BytesInput bytes) throws IOException {
+    ByteBuffer inBuf = bytes.toByteBuffer();
+
+    logger.trace(
+        "will use aircompressor to compress {} bytes from a {} containing a {}",
+        bytes.size(),
+        bytes.getClass().getName(),
+        inBuf.getClass().getName()
+    );
+
+    // aircompressor tells us the maximum amount of output buffer we could need
+    int maxOutLen = airComp.maxCompressedLength((int) bytes.size());
+    ByteBuffer outBuf = allocator.allocate(maxOutLen);
+    // track our allocation for later release in release()
+    this.allocatedBuffers.push(outBuf);
+
+    airComp.compress(inBuf, outBuf);
+
+    // flip: callers expect the output buffer positioned at the start of data
+    return BytesInput.from((ByteBuffer) outBuf.flip());
+  }
+
+  @Override
+  public CompressionCodecName getCodecName() {
+    return codecName;
+  }
+
+  @Override
+  public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+    ByteBuffer inBuf = bytes.toByteBuffer();
+
+    logger.trace(
+        "will use aircompressor to decompress {} bytes from a {} containing a {}.",
+        uncompressedSize,
+        bytes.getClass().getName(),
+        inBuf.getClass().getName()
+    );
+
+    ByteBuffer outBuf = allocator.allocate(uncompressedSize);
+    // track our allocation for later release in release()
+    this.allocatedBuffers.push(outBuf);
+
+    airDecomp.decompress(inBuf, outBuf);
+
+    // flip: callers expect the output buffer positioned at the start of data
+    return BytesInput.from((ByteBuffer) outBuf.flip());
+  }
+
+  @Override
+  public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+      throws IOException {
+    logger.trace(
+        "will use aircompressor to decompress {} bytes from a {} to a {}.",
+        uncompressedSize,
+        input.getClass().getName(),
+        output.getClass().getName()
+    );
+
+    airDecomp.decompress(input, output);
+  }
+
+  @Override
+  public void release() {
+    logger.debug(
+        "will release {} allocated buffers.",
+        this.allocatedBuffers.size()
+    );
+
+    while (!this.allocatedBuffers.isEmpty()) {
+      this.allocator.release(allocatedBuffers.pop());
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
new file mode 100644
index 0000000..53eee64
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/compression/DrillCompressionCodecFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.compression;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A delegating compression codec factory that returns (de)compressors based on
+ * https://github.com/airlift/aircompressor when possible and falls back to
+ * parquet-mr otherwise.  The aircompressor lib was introduced into Drill
+ * because of difficulties encountered with the JNI-based implementations of
+ * lzo, lz4 and zstd in parquet-mr.
+ *
+ * By modifying the constant AIRCOMPRESSOR_CODECS it is possible to choose
+ * which codecs should be routed to which lib.  In addition, this class
+ * implements parquet-mr's CompressionCodecFactory interface meaning that
+ * swapping this factory for e.g. one in parquet-mr will have minimal impact
+ * on code in Drill relying on a CompressCodecFactory.
+ *
+ */
+public class DrillCompressionCodecFactory implements CompressionCodecFactory {
+  private static final Logger logger = LoggerFactory.getLogger(DrillCompressionCodecFactory.class);
+
+  // The set of codecs to be handled by aircompressor
+  private static final Set<CompressionCodecName> AIRCOMPRESSOR_CODECS = new HashSet<>(
+      Arrays.asList(CompressionCodecName.LZ4, CompressionCodecName.LZO,
+          CompressionCodecName.SNAPPY, CompressionCodecName.ZSTD));
+
+  // pool of reused aircompressor compressors (parquet-mr's factory has its own)
+  private final Map<CompressionCodecName, AirliftBytesInputCompressor> airCompressors = new HashMap<>();
+
+  // fallback parquet-mr compression codec factory
+  private CompressionCodecFactory parqCodecFactory;
+
+  // direct memory allocator to be used during (de)compression
+  private ByteBufferAllocator allocator;
+
+  // static builder method, solely to mimick the parquet-mr API as closely as possible
+  public static CompressionCodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator,
+      int pageSize) {
+    return new DrillCompressionCodecFactory(config, allocator, pageSize);
+  }
+
+  public DrillCompressionCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+    this.allocator = allocator;
+    this.parqCodecFactory = CodecFactory.createDirectCodecFactory(config, allocator, pageSize);
+
+    logger.debug(
+        "constructed a {} using a fallback factory of {}",
+        getClass().getName(),
+        parqCodecFactory.getClass().getName()
+    );
+  }
+
+  @Override
+  public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
+    if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
+      return airCompressors.computeIfAbsent(
+          codecName,
+          c -> new AirliftBytesInputCompressor(codecName, allocator)
+      );
+    } else {
+      return parqCodecFactory.getCompressor(codecName);
+    }
+  }
+
+  @Override
+  public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
+    if (AIRCOMPRESSOR_CODECS.contains(codecName)) {
+      return airCompressors.computeIfAbsent(
+          codecName,
+          c -> new AirliftBytesInputCompressor(codecName, allocator)
+      );
+    } else {
+      return parqCodecFactory.getDecompressor(codecName);
+    }
+  }
+
+  @Override
+  public void release() {
+    parqCodecFactory.release();
+    logger.debug("released {}", parqCodecFactory);
+
+    for (AirliftBytesInputCompressor abic : airCompressors.values()) {
+      abic.release();
+      logger.debug("released {}", abic);
+    }
+    airCompressors.clear();
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 1c0490c..8fe61aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
@@ -32,12 +33,13 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.ColumnChunkIncReadStore;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -331,10 +333,21 @@ public class DrillParquetReader extends CommonParquetRecordReader {
           Function.identity(),
           (o, n) -> n));
 
-      pageReadStore = new ColumnChunkIncReadStore(numRecordsToRead,
-        CodecFactory.createDirectCodecFactory(drillFileSystem.getConf(),
-          new ParquetDirectByteBufferAllocator(operatorContext.getAllocator()), 0),
-        operatorContext.getAllocator(), drillFileSystem, entry.getPath());
+      BufferAllocator allocator = operatorContext.getAllocator();
+
+      CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+        drillFileSystem.getConf(),
+        new ParquetDirectByteBufferAllocator(allocator),
+        0
+      );
+
+      pageReadStore = new ColumnChunkIncReadStore(
+        numRecordsToRead,
+        ccf,
+        allocator,
+        drillFileSystem,
+        entry.getPath()
+      );
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 4bb1a22..c0d6a88 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -40,11 +40,12 @@ import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.format.DataPageHeaderV2;
 import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.Util;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.util.HadoopStreams;
 
@@ -55,14 +56,14 @@ public class ColumnChunkIncReadStore implements PageReadStore {
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
-  private CodecFactory codecFactory;
+  private CompressionCodecFactory codecFactory;
   private BufferAllocator allocator;
   private FileSystem fs;
   private Path path;
   private long rowCount;
   private List<FSDataInputStream> streams = new ArrayList<>();
 
-  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator,
+  public ColumnChunkIncReadStore(long rowCount, CompressionCodecFactory codecFactory, BufferAllocator allocator,
       FileSystem fs, Path path) {
     this.codecFactory = codecFactory;
     this.allocator = allocator;
@@ -82,7 +83,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
 
     private DictionaryPage dictionaryPage;
     private FSDataInputStream in;
-    private BytesDecompressor decompressor;
+    private BytesInputDecompressor decompressor;
 
     private ByteBuf lastPage;
 
@@ -100,19 +101,26 @@ public class ColumnChunkIncReadStore implements PageReadStore {
       if (dictionaryPage == null) {
         PageHeader pageHeader = new PageHeader();
         long pos = 0;
+
         try {
           pos = in.getPos();
           pageHeader = Util.readPageHeader(in);
+
           if (pageHeader.getDictionary_page_header() == null) {
             in.seek(pos);
             return null;
           }
-          dictionaryPage =
-                  new DictionaryPage(
-                          decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
-                          pageHeader.getDictionary_page_header().getNum_values(),
-                          parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
-                  );
+
+          BytesInput dictPageBytes = decompressor.decompress(
+              BytesInput.from(in, pageHeader.compressed_page_size),
+              pageHeader.getUncompressed_page_size()
+          );
+
+          dictionaryPage = new DictionaryPage(
+              dictPageBytes,
+              pageHeader.getDictionary_page_header().getNum_values(),
+              parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+          );
         } catch (Exception e) {
           throw new DrillRuntimeException("Error reading dictionary page." +
             "\nFile path: " + path.toUri().getPath() +
@@ -145,15 +153,20 @@ public class ColumnChunkIncReadStore implements PageReadStore {
           pageHeader = Util.readPageHeader(in);
           int uncompressedPageSize = pageHeader.getUncompressed_page_size();
           int compressedPageSize = pageHeader.getCompressed_page_size();
+
           switch (pageHeader.type) {
             case DICTIONARY_PAGE:
               if (dictionaryPage == null) {
-                dictionaryPage =
-                        new DictionaryPage(
-                                decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
-                                pageHeader.uncompressed_page_size,
-                                parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
-                        );
+                BytesInput pageBytes = decompressor.decompress(
+                    BytesInput.from(in, pageHeader.compressed_page_size),
+                    pageHeader.getUncompressed_page_size()
+                );
+
+                dictionaryPage = new DictionaryPage(
+                    pageBytes,
+                    pageHeader.uncompressed_page_size,
+                    parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+                );
               } else {
                 in.skip(pageHeader.compressed_page_size);
               }
@@ -165,14 +178,20 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
               HadoopStreams.wrap(in).readFully(buffer);
               buffer.flip();
+
+              BytesInput pageBytes = decompressor.decompress(
+                  BytesInput.from(buffer),
+                  pageHeader.getUncompressed_page_size()
+              );
+
               return new DataPageV1(
-                      decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()),
-                      pageHeader.data_page_header.num_values,
-                      pageHeader.uncompressed_page_size,
-                      fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
-                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
-                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
-                      parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+                  pageBytes,
+                  pageHeader.data_page_header.num_values,
+                  pageHeader.uncompressed_page_size,
+                  fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
+                  parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+                  parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+                  parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
               );
             // TODO - finish testing this with more files
             case DATA_PAGE_V2:
@@ -184,10 +203,11 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               buffer.flip();
               DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
               int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
-              BytesInput decompressedPageData =
-                  decompressor.decompress(
-                      BytesInput.from(buffer),
-                      pageHeader.uncompressed_page_size);
+              BytesInput decompressedPageData = decompressor.decompress(
+                  BytesInput.from(buffer),
+                  pageHeader.uncompressed_page_size
+              );
+
               ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
               int limit = byteBuffer.limit();
               byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
@@ -200,17 +220,17 @@ public class ColumnChunkIncReadStore implements PageReadStore {
               BytesInput data = BytesInput.from(byteBuffer.slice());
 
               return new DataPageV2(
-                      dataHeaderV2.getNum_rows(),
-                      dataHeaderV2.getNum_nulls(),
-                      dataHeaderV2.getNum_values(),
-                      repetitionLevels,
-                      definitionLevels,
-                      parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
-                      data,
-                      uncompressedPageSize,
-                      fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
-                      dataHeaderV2.isIs_compressed()
-                  );
+                  dataHeaderV2.getNum_rows(),
+                  dataHeaderV2.getNum_nulls(),
+                  dataHeaderV2.getNum_values(),
+                  repetitionLevels,
+                  definitionLevels,
+                  parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
+                  data,
+                  uncompressedPageSize,
+                  fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
+                  dataHeaderV2.isIs_compressed()
+              );
             default:
               in.skip(pageHeader.compressed_page_size);
               break;
@@ -233,6 +253,8 @@ public class ColumnChunkIncReadStore implements PageReadStore {
     }
 
     void close() {
+      decompressor.release();
+
       if (lastPage != null) {
         lastPage.release();
         lastPage = null;
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 790e3c3..31a304e 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -39,13 +39,13 @@ import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
 import org.apache.parquet.crypto.AesCipher;
 import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
 import org.apache.parquet.crypto.InternalFileEncryptor;
 import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
 import org.apache.parquet.format.BlockCipher;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
@@ -65,8 +65,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
   private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter, Closeable {
 
     private final ColumnDescriptor path;
-    private final BytesCompressor compressor;
-
+    private final BytesInputCompressor compressor;
     private final CapacityByteArrayOutputStream tempOutputStream;
     private final CapacityByteArrayOutputStream buf;
     private DictionaryPage dictionaryPage;
@@ -100,7 +99,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
     private final byte[] fileAAD;
 
     private ColumnChunkPageWriter(ColumnDescriptor path,
-                                  BytesCompressor compressor,
+                                  BytesInputCompressor compressor,
                                   int initialSlabSize,
                                   int maxCapacityHint,
                                   ByteBufferAllocator allocator,
@@ -402,14 +401,14 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
   private final MessageType schema;
 
-  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+  public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize,
                                           int maxCapacityHint, ByteBufferAllocator allocator,
                                           int columnIndexTruncateLength) {
     this(compressor, schema, initialSlabSize, maxCapacityHint, allocator, columnIndexTruncateLength,
             ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
   }
 
-  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+  public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize,
                                           int maxCapacityHint, ByteBufferAllocator allocator,
                                           int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
     this.schema = schema;
@@ -419,7 +418,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, BloomFi
     }
   }
 
-  public ParquetColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSlabSize,
+  public ParquetColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, int initialSlabSize,
                                           int maxCapacityHint, ByteBufferAllocator allocator,
                                           int columnIndexTruncateLength, boolean pageWriteChecksumEnabled,
                                           InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 170bf3d..dbae776 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -47,6 +47,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -967,6 +969,21 @@ public class TestParquetWriter extends BaseTestQuery {
     for (int i = 1; i <= repeat; i++) {
       testTPCHReadWriteGzip();
       testTPCHReadWriteSnappy();
+      testTPCHReadWriteBrotli();
+      testTPCHReadWriteLz4();
+      testTPCHReadWriteLzo();
+      testTPCHReadWriteZstd();
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteSnappy() throws Exception {
+    try {
+      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy");
+      String inputTable = "cp.`tpch/supplier.parquet`";
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
   }
 
@@ -974,19 +991,65 @@ public class TestParquetWriter extends BaseTestQuery {
   public void testTPCHReadWriteGzip() throws Exception {
     try {
       alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip");
-      String inputTable = "cp.`tpch/supplier.parquet`";
+      String inputTable = "cp.`supplier_gzip.parquet`";
         runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip");
     } finally {
       resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
   }
 
+  // We currently bundle the JNI-based com.rdblue.brotli-codec and it only provides
+  // natives for Mac and Linux on AMD64.  See PARQUET-1975.
   @Test
-  public void testTPCHReadWriteSnappy() throws Exception {
+  @DisabledIfSystemProperty(named = "os.name", matches = "Windows")
+  @EnabledIfSystemProperty(named = "os.arch", matches = "amd64") // reported for Linux on AMD64
+  @EnabledIfSystemProperty(named = "os.arch", matches = "x86_64") // reported for OS X on AMD64
+  public void testTPCHReadWriteBrotli() throws Exception {
     try {
-      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy");
-      String inputTable = "cp.`supplier_snappy.parquet`";
-      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
+      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "brotli");
+      // exercise the new Parquet record reader with this parquet-mr-backed codec
+      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      String inputTable = "cp.`supplier_brotli.parquet`";
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_brotli");
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteLz4() throws Exception {
+    try {
+      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lz4");
+      // exercise the async Parquet column reader with this aircompressor-backed codec
+      alterSession(ExecConstants.PARQUET_COLUMNREADER_ASYNC, true);
+      String inputTable = "cp.`supplier_lz4.parquet`";
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lz4");
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteLzo() throws Exception {
+    try {
+      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "lzo");
+      // exercise the async Parquet page reader with this aircompressor-backed codec
+      alterSession(ExecConstants.PARQUET_PAGEREADER_ASYNC, true);
+      String inputTable = "cp.`supplier_lzo.parquet`";
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_lzo");
+    } finally {
+      resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteZstd() throws Exception {
+    try {
+      alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "zstd");
+      // exercise the new Parquet record reader with this aircompressor-backed codec
+      alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true);
+      String inputTable = "cp.`supplier_zstd.parquet`";
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_zstd");
     } finally {
       resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE);
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index ca948f9..3ea25b2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,10 +32,11 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
 import org.apache.drill.test.LegacyOperatorTestBuilder;
 import org.apache.drill.test.PhysicalOpUnitTestBase;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 
@@ -447,12 +448,16 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
         ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), path);
 
         for (int i = 0; i < footer.getBlocks().size(); i++) {
+          CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+            fs.getConf(),
+            new ParquetDirectByteBufferAllocator(opContext.getAllocator()),
+            0
+          );
           readers.add(new ParquetRecordReader(fragContext,
               path,
               i,
               fs,
-              CodecFactory.createDirectCodecFactory(fs.getConf(),
-                  new ParquetDirectByteBufferAllocator(opContext.getAllocator()), 0),
+              ccf,
               footer,
               columnsToRead,
               ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_NO_CORRUPTION));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 16b25ce..4526381 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -66,7 +66,8 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.hadoop.Footer;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -632,8 +633,13 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
     final FileSystem fs = new CachedSingleFileSystem(fileName);
     final BufferAllocator allocator = RootAllocatorFactory.newRoot(c);
     for(int i = 0; i < 25; i++) {
+      CompressionCodecFactory ccf = DrillCompressionCodecFactory.createDirectCodecFactory(
+        dfsConfig,
+        new ParquetDirectByteBufferAllocator(allocator),
+        0
+      );
       final ParquetRecordReader rr = new ParquetRecordReader(context, fileName, 0, fs,
-          CodecFactory.createDirectCodecFactory(dfsConfig, new ParquetDirectByteBufferAllocator(allocator), 0),
+          ccf,
           f.getParquetMetadata(), columns, ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION);
       final TestOutputMutator mutator = new TestOutputMutator(allocator);
       rr.setup(null, mutator);
diff --git a/exec/java-exec/src/test/resources/supplier_brotli.parquet b/exec/java-exec/src/test/resources/supplier_brotli.parquet
new file mode 100644
index 0000000..5ae2721
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_brotli.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_gzip.parquet b/exec/java-exec/src/test/resources/supplier_gzip.parquet
new file mode 100644
index 0000000..e57bf05
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_gzip.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_lz4.parquet b/exec/java-exec/src/test/resources/supplier_lz4.parquet
new file mode 100644
index 0000000..1200590
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_lz4.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_lzo.parquet b/exec/java-exec/src/test/resources/supplier_lzo.parquet
new file mode 100644
index 0000000..e05283d
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_lzo.parquet differ
diff --git a/exec/java-exec/src/test/resources/supplier_snappy.parquet b/exec/java-exec/src/test/resources/supplier_snappy.parquet
deleted file mode 100644
index 5a01d9a..0000000
Binary files a/exec/java-exec/src/test/resources/supplier_snappy.parquet and /dev/null differ
diff --git a/exec/java-exec/src/test/resources/supplier_zstd.parquet b/exec/java-exec/src/test/resources/supplier_zstd.parquet
new file mode 100644
index 0000000..89cb283
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_zstd.parquet differ
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index a69c4aa..17a5182 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -188,6 +188,14 @@
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-compress</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>io.airlift</groupId>
+          <artifactId>aircompresssor</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.github.rdblue</groupId>
+          <artifactId>brotli-codec</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
diff --git a/pom.xml b/pom.xml
index ed2968b..31bbc9e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,8 @@
     <httpdlog-parser.version>5.7</httpdlog-parser.version>
     <yauaa.version>5.20</yauaa.version>
     <lombok.version>1.18.20</lombok.version>
+    <brotli-codec.version>0.1.1</brotli-codec.version>
+    <aircompressor.version>0.20</aircompressor.version>
   </properties>
 
   <scm>