You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 05:59:08 UTC

[pinot] branch master updated: feat: add compressed file support for ORCRecordReader (#9884)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2fbf9caf feat: add compressed file support for ORCRecordReader (#9884)
8a2fbf9caf is described below

commit 8a2fbf9caff8cb1bc7ec4bda94827fc556fc3866
Author: Eugene Tolbakov <ev...@gmail.com>
AuthorDate: Thu Dec 8 05:58:59 2022 +0000

    feat: add compressed file support for ORCRecordReader (#9884)
    
    * feat: add compressed file support for ORCRecordReader
    
    * feat: fix code review remarks, add gz support for parquet files
    
    * feat: move unpackIfRequired method in RecordReaderUtils
    
    * feat: apply Pinot code style
    
    * chore: fix code styles
---
 .../plugin/inputformat/orc/ORCRecordReader.java    | 18 +++----
 .../inputformat/orc/ORCRecordReaderTest.java       | 27 +++++++++++
 .../parquet/ParquetAvroRecordReader.java           |  4 +-
 .../parquet/ParquetNativeRecordReader.java         | 17 ++++---
 .../inputformat/parquet/ParquetRecordReader.java   |  6 ++-
 .../plugin/inputformat/parquet/ParquetUtils.java   |  7 +--
 .../parquet/ParquetNativeRecordReaderTest.java     |  4 +-
 .../parquet/ParquetRecordReaderTest.java           | 55 ++++++++++++++++++++--
 .../pinot/spi/data/readers/RecordReaderUtils.java  | 25 ++++++++++
 9 files changed, 134 insertions(+), 29 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index 2ecd7ad6d2..f98ab58921 100644
--- a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -45,6 +45,7 @@ import org.apache.orc.TypeDescription;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -77,11 +78,12 @@ public class ORCRecordReader implements RecordReader {
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     Configuration configuration = new Configuration();
-    Reader orcReader = OrcFile.createReader(new Path(dataFile.getAbsolutePath()),
+    File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc");
+    Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()),
         OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration)));
     TypeDescription orcSchema = orcReader.getSchema();
-    Preconditions
-        .checkState(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "ORC schema must be of type: STRUCT");
+    Preconditions.checkState(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+        "ORC schema must be of type: STRUCT");
     _orcFields = orcSchema.getFieldNames();
     _orcFieldTypes = orcSchema.getChildren();
 
@@ -128,9 +130,8 @@ public class ORCRecordReader implements RecordReader {
       // Maps always have two child columns for its keys and values
       List<TypeDescription> children = fieldType.getChildren();
       TypeDescription.Category keyCategory = children.get(0).getCategory();
-      Preconditions
-          .checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)", keyCategory,
-              field);
+      Preconditions.checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)",
+          keyCategory, field);
       initFieldsToRead(orcReaderInclude, children.get(1), field);
     } else if (category == TypeDescription.Category.STRUCT) {
       List<String> childrenFieldNames = fieldType.getFieldNames();
@@ -141,9 +142,8 @@ public class ORCRecordReader implements RecordReader {
       }
     } else {
       // Single-value field
-      Preconditions
-          .checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)", category,
-              field);
+      Preconditions.checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)",
+          category, field);
     }
   }
 
diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
index c244d93417..dd9b12a78c 100644
--- a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
@@ -17,9 +17,15 @@ package org.apache.pinot.plugin.inputformat.orc;
  * specific language governing permissions and limitations
  * under the License.
  */
+
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.zip.GZIPOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -32,6 +38,7 @@ import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.annotations.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -39,6 +46,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 public class ORCRecordReaderTest extends AbstractRecordReaderTest {
   private final File _dataFile = new File(_tempDir, "data.orc");
 
+  private void compressGzip(String sourcePath, String targetPath)
+      throws IOException {
+    try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
+      Files.copy(Paths.get(sourcePath), gos);
+    }
+  }
+
   @Override
   protected RecordReader createRecordReader()
       throws Exception {
@@ -143,4 +157,17 @@ public class ORCRecordReaderTest extends AbstractRecordReaderTest {
     }
     writer.close();
   }
+
+  @Test
+  public void testGzipORCRecordReader()
+      throws Exception {
+    String gzipFileName = "data.orc.gz";
+    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName));
+    final File gzDataFile = new File(_tempDir, gzipFileName);
+    ORCRecordReader orcRecordReader = new ORCRecordReader();
+    orcRecordReader.init(gzDataFile, _sourceFields, null);
+    checkValue(orcRecordReader, _records, _primaryKeys);
+    orcRecordReader.rewind();
+    checkValue(orcRecordReader, _records, _primaryKeys);
+  }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 9c494f0baf..787ddc25e7 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
@@ -48,7 +49,8 @@ public class ParquetAvroRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    _dataFilePath = new Path(dataFile.getAbsolutePath());
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _recordExtractor = new AvroRecordExtractor();
     _recordExtractor.init(fieldsToRead, null);
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index 3f413b9a68..da89c8a382 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
@@ -58,17 +59,16 @@ public class ParquetNativeRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    _dataFilePath = new Path(dataFile.getAbsolutePath());
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
     _recordExtractor = new ParquetNativeRecordExtractor();
     _recordExtractor.init(fieldsToRead, null);
 
-    _parquetReadOptions = ParquetReadOptions.builder()
-        .withMetadataFilter(ParquetMetadataConverter.NO_FILTER)
-        .build();
+    _parquetReadOptions = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build();
 
-    _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
-        _parquetReadOptions);
+    _parquetFileReader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), _parquetReadOptions);
     _schema = _parquetFileReader.getFooter().getFileMetaData().getSchema();
     _pageReadStore = _parquetFileReader.readNextRowGroup();
     _columnIO = new ColumnIOFactory().getColumnIO(_schema);
@@ -76,7 +76,6 @@ public class ParquetNativeRecordReader implements RecordReader {
     _currentPageIdx = 0;
   }
 
-
   @Override
   public boolean hasNext() {
     if (_pageReadStore == null) {
@@ -119,8 +118,8 @@ public class ParquetNativeRecordReader implements RecordReader {
   public void rewind()
       throws IOException {
     _parquetFileReader.close();
-    _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
-        _parquetReadOptions);
+    _parquetFileReader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), _parquetReadOptions);
     _pageReadStore = _parquetFileReader.readNextRowGroup();
     _parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
     _currentPageIdx = 0;
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 6be22851fc..60886b3b30 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
@@ -39,6 +40,7 @@ public class ParquetRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
     if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
       _internalParquetRecordReader = new ParquetAvroRecordReader();
     } else if (recordReaderConfig != null
@@ -47,14 +49,14 @@ public class ParquetRecordReader implements RecordReader {
       _internalParquetRecordReader = new ParquetNativeRecordReader();
     } else {
       // No reader type specified. Determine using file metadata
-      if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(dataFile.getAbsolutePath()))) {
+      if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(parquetFile.getAbsolutePath()))) {
         _internalParquetRecordReader = new ParquetAvroRecordReader();
       } else {
         _useAvroParquetRecordReader = false;
         _internalParquetRecordReader = new ParquetNativeRecordReader();
       }
     }
-    _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
+    _internalParquetRecordReader.init(parquetFile, fieldsToRead, recordReaderConfig);
   }
 
   @Override
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index a55e3fa702..f576a0a325 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -59,8 +59,8 @@ public class ParquetUtils {
    */
   public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema)
       throws IOException {
-    return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema)
-        .withConf(getParquetHadoopConfiguration()).build();
+    return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getParquetHadoopConfiguration())
+        .build();
   }
 
   /**
@@ -85,7 +85,8 @@ public class ParquetUtils {
     }
   }
 
-  public static boolean hasAvroSchemaInFileMetadata(Path path) throws IOException {
+  public static boolean hasAvroSchemaInFileMetadata(Path path)
+      throws IOException {
     ParquetMetadata footer =
         ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
     Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
index ffc80a98bb..378195d976 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
@@ -56,8 +56,8 @@ public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
       }
       records.add(record);
     }
-    try (ParquetWriter<GenericRecord> writer = ParquetUtils
-        .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+    try (ParquetWriter<GenericRecord> writer = ParquetUtils.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()),
+        schema)) {
       for (GenericRecord record : records) {
         writer.write(record);
       }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 14dda0e5d1..345cb1cdbd 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -20,10 +20,14 @@ package org.apache.pinot.plugin.inputformat.parquet;
 
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.zip.GZIPOutputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -40,6 +44,7 @@ import org.testng.annotations.Test;
 
 public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
   private final File _dataFile = new File(_tempDir, "data.parquet");
+  private final String _gzipFileName = "data.parquet.gz";
   private final File _testParquetFileWithInt96AndDecimal =
       new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
 
@@ -63,14 +68,21 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
       }
       records.add(record);
     }
-    try (ParquetWriter<GenericRecord> writer = ParquetUtils
-        .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+    try (ParquetWriter<GenericRecord> writer = ParquetUtils.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()),
+        schema)) {
       for (GenericRecord record : records) {
         writer.write(record);
       }
     }
   }
 
+  private void compressGzip(String sourcePath, String targetPath)
+      throws IOException {
+    try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
+      Files.copy(Paths.get(sourcePath), gos);
+    }
+  }
+
   @Test
   public void testParquetAvroRecordReader()
       throws IOException {
@@ -108,7 +120,6 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
     // Should be avro since file metadata has avro schema
     Assert.assertTrue(parquetRecordReader.useAvroParquetRecordReader());
 
-
     final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
     File nativeParquetFile = new File(getClass().getClassLoader().getResource("users.parquet").getFile());
     parquetRecordReader.init(nativeParquetFile, null, null);
@@ -157,4 +168,42 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
     Assert.assertEquals(recordsRead, totalRecords,
         "Message read from ParquetRecordReader doesn't match the expected number.");
   }
+
+  @Test
+  public void testGzipParquetRecordReader()
+      throws IOException {
+    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
+    final File gzDataFile = new File(_tempDir, _gzipFileName);
+    ParquetRecordReader recordReader = new ParquetRecordReader();
+    recordReader.init(gzDataFile, _sourceFields, null);
+    testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE);
+  }
+
+  @Test
+  public void testGzipParquetAvroRecordReader()
+      throws IOException {
+    ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
+    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
+    final File gzDataFile = new File(_tempDir, _gzipFileName);
+    avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig());
+    testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
+  }
+
+  @Test
+  public void testGzipParquetNativeRecordReader()
+      throws IOException {
+    ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();
+
+    final String gzParquetFileWithInt96AndDecimal =
+        String.format("%s.gz", _testParquetFileWithInt96AndDecimal.getAbsolutePath());
+    compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzParquetFileWithInt96AndDecimal);
+    final File gzTestParquetFileWithInt96AndDecimal = new File(gzParquetFileWithInt96AndDecimal);
+    nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(nativeRecordReader, 1965);
+
+    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
+    final File gzDataFile = new File(_tempDir, _gzipFileName);
+    nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
index 657686e196..fd84c5f306 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
@@ -25,7 +25,10 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.zip.GZIPInputStream;
 
 
@@ -57,4 +60,26 @@ public class RecordReaderUtils {
       return new FileInputStream(dataFile);
     }
   }
+
+  public static File unpackIfRequired(File dataFile, String extension)
+      throws IOException {
+    if (isGZippedFile(dataFile)) {
+      try (final InputStream inputStream = getInputStream(dataFile)) {
+        File targetFile = new File(String.format("%s.%s", dataFile.getAbsolutePath(), extension));
+        Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        return targetFile;
+      }
+    } else {
+      return dataFile;
+    }
+  }
+
+  private static boolean isGZippedFile(File file)
+      throws IOException {
+    int magic = 0;
+    try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
+      magic = raf.read() & 0xff | ((raf.read() << 8) & 0xff00);
+    }
+    return magic == GZIPInputStream.GZIP_MAGIC;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org