You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 20:43:22 UTC

[pinot] branch avro-gz-improvment created (now 667128a797)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a change to branch avro-gz-improvment
in repository https://gitbox.apache.org/repos/asf/pinot.git


      at 667128a797 Improving gz support for avro record readers

This branch includes the following new commits:

     new 667128a797 Improving gz support for avro record readers

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Improving gz support for avro record readers

Posted by sn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch avro-gz-improvment
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 667128a797e332a4b5027b392b75cbe70fe7f4e4
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Dec 8 12:41:19 2022 -0800

    Improving gz support for avro record readers
    
    - Improve gz support for avro record reader to
      accept Gzip format without ".gz" postfix in
      the file name
    - Improved all record reader tests to cover
      Gzip file test.
---
 .../pinot/plugin/inputformat/avro/AvroUtils.java   |   3 +-
 .../inputformat/avro/AvroRecordReaderTest.java     |   8 +-
 .../inputformat/csv/CSVRecordReaderTest.java       |  11 +-
 .../json/GzippedJSONRecordReaderTest.java          |  54 ------
 .../inputformat/json/JSONRecordReaderTest.java     |  13 +-
 .../plugin/inputformat/orc/ORCRecordReader.java    |   4 +-
 .../inputformat/orc/ORCRecordReaderTest.java       |  27 +--
 .../parquet/ParquetAvroRecordReader.java           |   4 +-
 .../parquet/ParquetNativeRecordReader.java         |   4 +-
 .../inputformat/parquet/ParquetRecordReader.java   |   4 +-
 .../parquet/ParquetNativeRecordReaderTest.java     |  10 +-
 .../parquet/ParquetRecordReaderTest.java           |  98 +++++-----
 .../protobuf/ProtoBufRecordReaderTest.java         |  31 ++--
 .../inputformat/thrift/ThriftRecordReaderTest.java | 197 +++++++++++----------
 .../spi/data/readers/BaseRecordExtractor.java      |   3 +
 .../pinot/spi/data/readers/RecordReaderUtils.java  |  10 +-
 .../spi/data/readers/AbstractRecordReaderTest.java |  67 ++++++-
 17 files changed, 292 insertions(+), 256 deletions(-)

diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
index 8c98f57a3f..7fbc087fcb 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
@@ -39,6 +39,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
 
 
 /**
@@ -214,7 +215,7 @@ public class AvroUtils {
    */
   public static DataFileStream<GenericRecord> getAvroReader(File avroFile)
       throws IOException {
-    if (avroFile.getName().endsWith(".gz")) {
+    if (RecordReaderUtils.isGZippedFile(avroFile)) {
       return new DataFileStream<>(new GZIPInputStream(new FileInputStream(avroFile)), new GenericDatumReader<>());
     } else {
       return new DataFileStream<>(new FileInputStream(avroFile), new GenericDatumReader<>());
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
index 3d075a6ee5..9d42e9ce96 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
@@ -33,10 +33,9 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 public class AvroRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.avro");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     AvroRecordReader avroRecordReader = new AvroRecordReader();
     avroRecordReader.init(_dataFile, _sourceFields, null);
@@ -59,4 +58,9 @@ public class AvroRecordReaderTest extends AbstractRecordReaderTest {
       }
     }
   }
+
+  @Override
+  protected String getDataFileName() {
+    return "data.avro";
+  }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index e4b26d980d..0bbbcef26c 100644
--- a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -38,22 +38,20 @@ import org.testng.annotations.Test;
 
 public class CSVRecordReaderTest extends AbstractRecordReaderTest {
   private static final char CSV_MULTI_VALUE_DELIMITER = '\t';
-  private final File _dataFile = new File(_tempDir, "data.csv");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
     csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
     CSVRecordReader csvRecordReader = new CSVRecordReader();
-    csvRecordReader.init(_dataFile, _sourceFields, csvRecordReaderConfig);
+    csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig);
     return csvRecordReader;
   }
 
   @Override
   protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
       throws Exception {
-
     Schema pinotSchema = getPinotSchema();
     String[] columns = pinotSchema.getColumnNames().toArray(new String[0]);
     try (FileWriter fileWriter = new FileWriter(_dataFile);
@@ -73,6 +71,11 @@ public class CSVRecordReaderTest extends AbstractRecordReaderTest {
     }
   }
 
+  @Override
+  protected String getDataFileName() {
+    return "data.csv";
+  }
+
   @Override
   protected void checkValue(RecordReader recordReader, List<Map<String, Object>> expectedRecordsMap,
       List<Object[]> expectedPrimaryKeys)
diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java
deleted file mode 100644
index 997c1c0495..0000000000
--- a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/GzippedJSONRecordReaderTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.inputformat.json;
-
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.GZIPOutputStream;
-import org.apache.pinot.spi.utils.JsonUtils;
-
-
-/**
- * Unit test for {@link JSONRecordReader} for a Gzipped JSON file.
- * Relies on the {@link JSONRecordReaderTest} for actual tests, by simply overriding
- * the JSON file generation to generate a gzipped JSON file.
- */
-public class GzippedJSONRecordReaderTest extends JSONRecordReaderTest {
-  private final File _dateFile = new File(_tempDir, "data.json");
-
-  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
-      throws Exception {
-    try (Writer writer = new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(_dateFile)),
-        StandardCharsets.UTF_8)) {
-      for (Map<String, Object> r : recordsToWrite) {
-        ObjectNode jsonRecord = JsonUtils.newObjectNode();
-        for (String key : r.keySet()) {
-          jsonRecord.set(key, JsonUtils.objectToJsonNode(r.get(key)));
-        }
-        writer.write(jsonRecord.toString());
-      }
-    }
-  }
-}
diff --git a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
index 81ab49983c..ee541d1054 100644
--- a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
@@ -30,23 +30,23 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class JSONRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dateFile = new File(_tempDir, "data.json");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     JSONRecordReader recordReader = new JSONRecordReader();
-    recordReader.init(_dateFile, _sourceFields, null);
+    recordReader.init(file, _sourceFields, null);
     return recordReader;
   }
 
   @Override
   protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
       throws Exception {
-    try (FileWriter fileWriter = new FileWriter(_dateFile)) {
+    try (FileWriter fileWriter = new FileWriter(_dataFile)) {
       for (Map<String, Object> r : recordsToWrite) {
         ObjectNode jsonRecord = JsonUtils.newObjectNode();
         for (String key : r.keySet()) {
@@ -57,6 +57,11 @@ public class JSONRecordReaderTest extends AbstractRecordReaderTest {
     }
   }
 
+  @Override
+  protected String getDataFileName() {
+    return "data.json";
+  }
+
   @Override
   protected void checkValue(RecordReader recordReader, List<Map<String, Object>> expectedRecordsMap,
       List<Object[]> expectedPrimaryKeys)
diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index f98ab58921..8a4d3fd709 100644
--- a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -66,6 +66,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * </ul>
  */
 public class ORCRecordReader implements RecordReader {
+  private static final String EXTENSION = "orc";
+
   private List<String> _orcFields;
   private List<TypeDescription> _orcFieldTypes;
   private boolean[] _includeOrcFields;
@@ -78,7 +80,7 @@ public class ORCRecordReader implements RecordReader {
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
     Configuration configuration = new Configuration();
-    File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc");
+    File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()),
         OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration)));
     TypeDescription orcSchema = orcReader.getSchema();
diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
index dd9b12a78c..042ba4b762 100644
--- a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
@@ -17,7 +17,6 @@ package org.apache.pinot.plugin.inputformat.orc;
  * specific language governing permissions and limitations
  * under the License.
  */
-
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -44,20 +43,12 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 public class ORCRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.orc");
-
-  private void compressGzip(String sourcePath, String targetPath)
-      throws IOException {
-    try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
-      Files.copy(Paths.get(sourcePath), gos);
-    }
-  }
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     ORCRecordReader orcRecordReader = new ORCRecordReader();
-    orcRecordReader.init(_dataFile, _sourceFields, null);
+    orcRecordReader.init(file, _sourceFields, null);
     return orcRecordReader;
   }
 
@@ -158,16 +149,8 @@ public class ORCRecordReaderTest extends AbstractRecordReaderTest {
     writer.close();
   }
 
-  @Test
-  public void testGzipORCRecordReader()
-      throws Exception {
-    String gzipFileName = "data.orc.gz";
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName));
-    final File gzDataFile = new File(_tempDir, gzipFileName);
-    ORCRecordReader orcRecordReader = new ORCRecordReader();
-    orcRecordReader.init(gzDataFile, _sourceFields, null);
-    checkValue(orcRecordReader, _records, _primaryKeys);
-    orcRecordReader.rewind();
-    checkValue(orcRecordReader, _records, _primaryKeys);
+  @Override
+  protected String getDataFileName() {
+    return "data.orc";
   }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 787ddc25e7..8caf384630 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -41,6 +41,8 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  *   https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a>
  */
 public class ParquetAvroRecordReader implements RecordReader {
+  private static final String EXTENSION = "parquet";
+
   private Path _dataFilePath;
   private AvroRecordExtractor _recordExtractor;
   private ParquetReader<GenericRecord> _parquetReader;
@@ -49,7 +51,7 @@ public class ParquetAvroRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
     _recordExtractor = new AvroRecordExtractor();
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index da89c8a382..247bd6c82a 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -44,6 +44,8 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  * Record reader for Native Parquet file.
  */
 public class ParquetNativeRecordReader implements RecordReader {
+  private static final String EXTENSION = "parquet";
+
   private Path _dataFilePath;
   private ParquetNativeRecordExtractor _recordExtractor;
   private MessageType _schema;
@@ -59,7 +61,7 @@ public class ParquetNativeRecordReader implements RecordReader {
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     _dataFilePath = new Path(parquetFile.getAbsolutePath());
     _hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
     _recordExtractor = new ParquetNativeRecordExtractor();
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 60886b3b30..0b119baad3 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -34,13 +34,15 @@ import org.apache.pinot.spi.data.readers.RecordReaderUtils;
  * It has two implementations: {@link ParquetAvroRecordReader} (Default) and {@link ParquetNativeRecordReader}.
  */
 public class ParquetRecordReader implements RecordReader {
+  private static final String EXTENSION = "parquet";
+
   private RecordReader _internalParquetRecordReader;
   private boolean _useAvroParquetRecordReader = true;
 
   @Override
   public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
       throws IOException {
-    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+    File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
     if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
       _internalParquetRecordReader = new ParquetAvroRecordReader();
     } else if (recordReaderConfig != null
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
index 378195d976..58177ee38c 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
@@ -34,13 +34,12 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.parquet");
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader();
-    recordReader.init(_dataFile, _sourceFields, null);
+    recordReader.init(file, _sourceFields, null);
     return recordReader;
   }
 
@@ -63,4 +62,9 @@ public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
       }
     }
   }
+
+  @Override
+  protected String getDataFileName() {
+    return "data.parquet";
+  }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 345cb1cdbd..64f39ed89d 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -20,14 +20,10 @@ package org.apache.pinot.plugin.inputformat.parquet;
 
 import com.google.common.collect.ImmutableSet;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.GZIPOutputStream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -43,16 +39,16 @@ import org.testng.annotations.Test;
 
 
 public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
-  private final File _dataFile = new File(_tempDir, "data.parquet");
-  private final String _gzipFileName = "data.parquet.gz";
   private final File _testParquetFileWithInt96AndDecimal =
       new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
 
+  private static final int NUM_RECORDS_TEST_PARQUET_WITH_INT96 = 1965;
+
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     ParquetRecordReader recordReader = new ParquetRecordReader();
-    recordReader.init(_dataFile, _sourceFields, null);
+    recordReader.init(file, _sourceFields, null);
     return recordReader;
   }
 
@@ -76,11 +72,9 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
     }
   }
 
-  private void compressGzip(String sourcePath, String targetPath)
-      throws IOException {
-    try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
-      Files.copy(Paths.get(sourcePath), gos);
-    }
+  @Override
+  protected String getDataFileName() {
+    return "data.parquet";
   }
 
   @Test
@@ -122,9 +116,9 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
 
     final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
     File nativeParquetFile = new File(getClass().getClassLoader().getResource("users.parquet").getFile());
-    parquetRecordReader.init(nativeParquetFile, null, null);
+    parquetRecordReader2.init(nativeParquetFile, null, null);
     // Should be native since file metadata does not have avro schema
-    Assert.assertFalse(parquetRecordReader.useAvroParquetRecordReader());
+    Assert.assertFalse(parquetRecordReader2.useAvroParquetRecordReader());
   }
 
   @Test
@@ -171,39 +165,61 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
 
   @Test
   public void testGzipParquetRecordReader()
-      throws IOException {
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
-    final File gzDataFile = new File(_tempDir, _gzipFileName);
-    ParquetRecordReader recordReader = new ParquetRecordReader();
-    recordReader.init(gzDataFile, _sourceFields, null);
-    testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE);
+      throws Exception {
+    // Test the gzip file that ends with ".gz"
+    File gzipDataFile = new File(_tempDir, _dataFile.getName() + ".gz");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile.getAbsolutePath());
+    ParquetRecordReader parquetRecordReader = new ParquetRecordReader();
+    parquetRecordReader.init(gzipDataFile, _sourceFields, null);
+    checkValue(parquetRecordReader, _records, _primaryKeys);
+    parquetRecordReader.rewind();
+    checkValue(parquetRecordReader, _records, _primaryKeys);
+
+    // Test the gzip file that doesn't end with '.gz'.
+    File gzipDataFile2 = new File(_tempDir, _dataFile.getName() + ".test");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile2.getAbsolutePath());
+    parquetRecordReader = new ParquetRecordReader();
+    parquetRecordReader.init(gzipDataFile2, _sourceFields, null);
+    checkValue(parquetRecordReader, _records, _primaryKeys);
   }
 
   @Test
   public void testGzipParquetAvroRecordReader()
-      throws IOException {
-    ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
-    final File gzDataFile = new File(_tempDir, _gzipFileName);
-    avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig());
-    testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
+      throws Exception {
+    // Test the gzip file that ends with ".gz"
+    File gzipDataFile = new File(_tempDir, _dataFile.getName() + ".gz");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile.getAbsolutePath());
+    ParquetAvroRecordReader parquetRecordReader = new ParquetAvroRecordReader();
+    parquetRecordReader.init(gzipDataFile, _sourceFields, null);
+    checkValue(parquetRecordReader, _records, _primaryKeys);
+    parquetRecordReader.rewind();
+    checkValue(parquetRecordReader, _records, _primaryKeys);
+
+    // Test the gzip file that doesn't end with '.gz'.
+    File gzipDataFile2 = new File(_tempDir, _dataFile.getName() + ".test");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile2.getAbsolutePath());
+    parquetRecordReader = new ParquetAvroRecordReader();
+    parquetRecordReader.init(gzipDataFile2, _sourceFields, null);
+    checkValue(parquetRecordReader, _records, _primaryKeys);
   }
 
   @Test
   public void testGzipParquetNativeRecordReader()
-      throws IOException {
-    ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();
-
-    final String gzParquetFileWithInt96AndDecimal =
-        String.format("%s.gz", _testParquetFileWithInt96AndDecimal.getAbsolutePath());
-    compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzParquetFileWithInt96AndDecimal);
-    final File gzTestParquetFileWithInt96AndDecimal = new File(gzParquetFileWithInt96AndDecimal);
-    nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
-    testReadParquetFile(nativeRecordReader, 1965);
-
-    compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
-    final File gzDataFile = new File(_tempDir, _gzipFileName);
-    nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
-    testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
+      throws Exception {
+    // Test the gzip file that ends with ".gz"
+    File gzipDataFile = new File(_tempDir, _testParquetFileWithInt96AndDecimal.getName() + ".gz");
+    compressFileWithGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzipDataFile.getAbsolutePath());
+    ParquetNativeRecordReader parquetRecordReader = new ParquetNativeRecordReader();
+    parquetRecordReader.init(gzipDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(parquetRecordReader, NUM_RECORDS_TEST_PARQUET_WITH_INT96);
+    parquetRecordReader.rewind();
+    testReadParquetFile(parquetRecordReader, NUM_RECORDS_TEST_PARQUET_WITH_INT96);
+
+    // Test the gzip file that doesn't end with '.gz'.
+    File gzipDataFile2 = new File(_tempDir, _testParquetFileWithInt96AndDecimal.getName() + ".test");
+    compressFileWithGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzipDataFile2.getAbsolutePath());
+    parquetRecordReader = new ParquetNativeRecordReader();
+    parquetRecordReader.init(gzipDataFile2, ImmutableSet.of(), new ParquetRecordReaderConfig());
+    testReadParquetFile(parquetRecordReader, NUM_RECORDS_TEST_PARQUET_WITH_INT96);
   }
 }
diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
index ce9cd1b346..eac590a803 100644
--- a/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-protobuf/src/test/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReaderTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.plugin.inputformat.protobuf;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
@@ -35,7 +34,6 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
 import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -44,7 +42,7 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest {
   private final static Random RANDOM = new Random(System.currentTimeMillis());
   private static final String PROTO_DATA = "_test_sample_proto_data.data";
   private static final String DESCRIPTOR_FILE = "sample.desc";
-  private File _tempFile;
+  private File _dataFile;
   private RecordReader _recordReader;
   private final static int SAMPLE_RECORDS_SIZE = 10000;
 
@@ -57,7 +55,7 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest {
         .addMultiValueDimension("friends", FieldSpec.DataType.STRING).build();
   }
 
-  private static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) {
+  protected static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) {
     List<Map<String, Object>> records = new ArrayList<>();
 
     for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) {
@@ -115,13 +113,6 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest {
     _recordReader = createRecordReader();
   }
 
-  @AfterClass
-  @Override
-  public void tearDown()
-      throws Exception {
-    FileUtils.forceDelete(_tempFile);
-  }
-
   @Test
   public void testRecordReader()
       throws Exception {
@@ -131,11 +122,11 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest {
   }
 
   @Override
-  protected RecordReader createRecordReader()
+  protected RecordReader createRecordReader(File file)
       throws Exception {
     RecordReader recordReader = new ProtoBufRecordReader();
     Set<String> sourceFields = getSourceFields(getPinotSchema());
-    recordReader.init(_tempFile, sourceFields, getProtoRecordReaderConfig());
+    recordReader.init(file.getAbsoluteFile(), sourceFields, getProtoRecordReaderConfig());
     return recordReader;
   }
 
@@ -151,17 +142,21 @@ public class ProtoBufRecordReaderTest extends AbstractRecordReaderTest {
       lists.add(sampleRecord);
     }
 
-    _tempFile = getSampleDataPath();
-    try (FileOutputStream output = new FileOutputStream(_tempFile, true)) {
+    _dataFile = getSampleDataPath();
+    try (FileOutputStream output = new FileOutputStream(_dataFile, true)) {
       for (Sample.SampleRecord record : lists) {
         record.writeDelimitedTo(output);
       }
     }
   }
 
-  private File getSampleDataPath()
-      throws IOException {
-    return File.createTempFile(ProtoBufRecordReaderTest.class.getName(), PROTO_DATA);
+  @Override
+  protected String getDataFileName() {
+    return PROTO_DATA;
+  }
+
+  private File getSampleDataPath() {
+    return new File(_tempDir, PROTO_DATA);
   }
 
   private ProtoBufRecordReaderConfig getProtoRecordReaderConfig()
diff --git a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
index 085fbf9414..743da3a099 100644
--- a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
@@ -21,7 +21,7 @@ package org.apache.pinot.plugin.inputformat.thrift;
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -32,116 +32,79 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 
 /**
  * Test {@code org.apache.pinot.plugin.inputformat.thrift.data.ThriftRecordReader} for a given sample thrift
  * data.
  */
-public class ThriftRecordReaderTest {
+public class ThriftRecordReaderTest extends AbstractRecordReaderTest {
   private static final String THRIFT_DATA = "_test_sample_thrift_data.thrift";
 
-  private File _tempFile;
+  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
+    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
+    config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData");
+    return config;
+  }
 
   @BeforeClass
   public void setUp()
       throws Exception {
-    FileUtils.deleteQuietly(_tempFile);
-
-    ThriftSampleData t1 = new ThriftSampleData();
-    t1.setActive(true);
-    t1.setCreated_at(1515541280L);
-    t1.setId(1);
-    t1.setName("name1");
-    List<Short> t1Groups = new ArrayList<>(2);
-    t1Groups.add((short) 1);
-    t1Groups.add((short) 4);
-    t1.setGroups(t1Groups);
-    Map<String, Long> mapValues = new HashMap<>();
-    mapValues.put("name1", 1L);
-    t1.setMap_values(mapValues);
-    Set<String> namesSet = new HashSet<>();
-    namesSet.add("name1");
-    t1.setSet_values(namesSet);
-
-    ThriftSampleData t2 = new ThriftSampleData();
-    t2.setActive(false);
-    t2.setCreated_at(1515541290L);
-    t2.setId(2);
-    t2.setName("name2");
-    List<Short> t2Groups = new ArrayList<>(2);
-    t2Groups.add((short) 2);
-    t2Groups.add((short) 3);
-    t2.setGroups(t2Groups);
-    List<ThriftSampleData> lists = new ArrayList<>(2);
-    lists.add(t1);
-    lists.add(t2);
-    TSerializer binarySerializer = new TSerializer(new TBinaryProtocol.Factory());
-    _tempFile = getSampleDataPath();
-    FileWriter writer = new FileWriter(_tempFile);
-    for (ThriftSampleData d : lists) {
-      IOUtils.write(binarySerializer.serialize(d), writer);
-    }
-    writer.close();
-  }
-
-  @Test
-  public void testReadData()
-      throws IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig());
-    List<GenericRow> genericRows = new ArrayList<>();
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-    recordReader.close();
-    Assert.assertEquals(genericRows.size(), 2, "The number of rows return is incorrect");
-    int id = 1;
-    for (GenericRow outputRow : genericRows) {
-      Assert.assertEquals(outputRow.getValue("id"), id);
-      Assert.assertNull(outputRow.getValue("map_values"));
-      id++;
+    if (_tempDir.exists()) {
+      FileUtils.cleanDirectory(_tempDir);
     }
+    FileUtils.forceMkdir(_tempDir);
+    // Generate Pinot schema
+    _pinotSchema = getPinotSchema();
+    _sourceFields = getSourceFields(_pinotSchema);
+    // Generate random records based on Pinot schema
+    _records = generateRandomRecords();
+    _primaryKeys = generatePrimaryKeys(_records, getPrimaryKeyColumns());
+    // Write generated random records to file
+    writeRecordsToFile(_records);
+    // Create and init RecordReader
+    _recordReader = createRecordReader();
   }
 
-  @Test
-  public void testRewind()
-      throws IOException {
-    ThriftRecordReader recordReader = new ThriftRecordReader();
-    recordReader.init(_tempFile, getSourceFields(), getThriftRecordReaderConfig());
-    List<GenericRow> genericRows = new ArrayList<>();
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-
-    recordReader.rewind();
-
-    while (recordReader.hasNext()) {
-      genericRows.add(recordReader.next());
-    }
-    recordReader.close();
-    Assert.assertEquals(genericRows.size(), 4, "The number of rows return after the rewind is incorrect");
+  private List<Map<String, Object>> generateRandomRecords() {
+    // TODO: instead of hardcoding some rows, change this to work with the AbstractRecordReader's random value generator
+    List<Map<String, Object>> records = new ArrayList<>();
+    Map<String, Object> record1 = new HashMap<>();
+    record1.put("active", "true");
+    record1.put("created_at", 1515541280L);
+    record1.put("id", 1);
+    List<Integer> groups1 = new ArrayList<>();
+    groups1.add(1);
+    groups1.add(4);
+    record1.put("groups", groups1);
+    Map<String, Long> mapValues1 = new HashMap<>();
+    mapValues1.put("name1", 1L);
+    record1.put("map_values", mapValues1);
+    List<String> setValues1 = new ArrayList<>();
+    setValues1.add("name1");
+    record1.put("set_values", setValues1);
+    records.add(record1);
+
+    Map<String, Object> record2 = new HashMap<>();
+    record2.put("active", "false");
+    record2.put("created_at", 1515541290L);
+    record2.put("id", 1);
+    List<Integer> groups2 = new ArrayList<>();
+    groups2.add(2);
+    groups2.add(3);
+    record2.put("groups", groups2);
+    records.add(record2);
+
+    return records;
   }
 
-  private File getSampleDataPath()
-      throws IOException {
-    return File.createTempFile(ThriftRecordReaderTest.class.getName(), THRIFT_DATA);
-  }
-
-  private ThriftRecordReaderConfig getThriftRecordReaderConfig() {
-    ThriftRecordReaderConfig config = new ThriftRecordReaderConfig();
-    config.setThriftClass("org.apache.pinot.plugin.inputformat.thrift.ThriftSampleData");
-    return config;
-  }
-
-  private Schema getSchema() {
+  @Override
+  protected org.apache.pinot.spi.data.Schema getPinotSchema() {
     return new Schema.SchemaBuilder().setSchemaName("ThriftSampleData")
         .addSingleValueDimension("id", FieldSpec.DataType.INT)
         .addSingleValueDimension("name", FieldSpec.DataType.STRING)
@@ -155,8 +118,54 @@ public class ThriftRecordReaderTest {
     return Sets.newHashSet("id", "name", "created_at", "active", "groups", "set_values");
   }
 
-  @AfterClass
-  public void tearDown() {
-    FileUtils.deleteQuietly(_tempFile);
+  @Override
+  protected RecordReader createRecordReader(File file)
+      throws Exception {
+    ThriftRecordReader recordReader = new ThriftRecordReader();
+    recordReader.init(file, getSourceFields(), getThriftRecordReaderConfig());
+    return recordReader;
+  }
+
+  @Override
+  protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
+      throws Exception {
+    List<ThriftSampleData> dataList = new ArrayList<>(recordsToWrite.size());
+    for (Map<String, Object> record: recordsToWrite) {
+      ThriftSampleData data = new ThriftSampleData();
+      data.setActive(Boolean.parseBoolean(record.get("active").toString()));
+      data.setCreated_at(Math.abs(((Long)record.get("created_at")).longValue()));
+      int i = Math.abs(((Integer)record.get("id")).intValue());
+      data.setId(i);
+      data.setName((String) record.get("name"));
+      List<Integer> groupsList = (List<Integer>)record.get("groups");
+      if (groupsList != null) {
+        List<Short> groupsResult = new ArrayList<>(groupsList.size());
+        for (Integer num : groupsList) {
+          groupsResult.add(num.shortValue());
+        }
+        data.setGroups(groupsResult);
+      }
+      List<String> setValuesList = (List<String>) record.get("set_values");
+      if (setValuesList != null) {
+        Set<String> setValuesResult = new HashSet<>(setValuesList.size());
+        for (String s : setValuesList) {
+          setValuesResult.add(s);
+        }
+        data.setSet_values(setValuesResult);
+      }
+      dataList.add(data);
+    }
+
+    TSerializer binarySerializer = new TSerializer(new TBinaryProtocol.Factory());
+    FileWriter writer = new FileWriter(_dataFile);
+    for (ThriftSampleData d : dataList) {
+      IOUtils.write(binarySerializer.serialize(d), writer, Charset.defaultCharset());
+    }
+    writer.close();
+  }
+
+  @Override
+  protected String getDataFileName() {
+    return THRIFT_DATA;
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
index 90a29dbd5d..f4efdb6d2d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/BaseRecordExtractor.java
@@ -184,6 +184,9 @@ public abstract class BaseRecordExtractor<T> implements RecordExtractor<T> {
       return bytesValue;
     }
     if (value instanceof Number || value instanceof byte[]) {
+      if (value instanceof Short) {
+        return Integer.valueOf(value.toString());
+      }
       return value;
     }
     return value.toString();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
index fd84c5f306..780eeeb5ce 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
@@ -65,7 +65,13 @@ public class RecordReaderUtils {
       throws IOException {
     if (isGZippedFile(dataFile)) {
       try (final InputStream inputStream = getInputStream(dataFile)) {
-        File targetFile = new File(String.format("%s.%s", dataFile.getAbsolutePath(), extension));
+        String targetFileName = dataFile.getName();
+        if (targetFileName.endsWith(".gz")) {
+          targetFileName = targetFileName.substring(0, targetFileName.length() - 3);
+        } else {
+          targetFileName = targetFileName + "." + extension;
+        }
+        File targetFile = new File(dataFile.getParentFile(), targetFileName);
         Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
         return targetFile;
       }
@@ -74,7 +80,7 @@ public class RecordReaderUtils {
     }
   }
 
-  private static boolean isGZippedFile(File file)
+  public static boolean isGZippedFile(File file)
       throws IOException {
     int magic = 0;
     try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index b4821bb54c..d39f315698 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -20,12 +20,17 @@ package org.apache.pinot.spi.data.readers;
 
 import com.google.common.collect.Sets;
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.zip.GZIPOutputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -42,13 +47,14 @@ public abstract class AbstractRecordReaderTest {
   protected final static int SAMPLE_RECORDS_SIZE = 10000;
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), "RecordReaderTest");
+  protected final File _dataFile = new File(_tempDir, getDataFileName());
   protected List<Map<String, Object>> _records;
   protected List<Object[]> _primaryKeys;
   protected org.apache.pinot.spi.data.Schema _pinotSchema;
   protected Set<String> _sourceFields;
-  private RecordReader _recordReader;
+  protected RecordReader _recordReader;
 
-  private static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) {
+  protected static List<Map<String, Object>> generateRandomRecords(Schema pinotSchema) {
     List<Map<String, Object>> records = new ArrayList<>();
 
     for (int i = 0; i < SAMPLE_RECORDS_SIZE; i++) {
@@ -99,6 +105,8 @@ public abstract class AbstractRecordReaderTest {
         return RANDOM.nextDouble();
       case STRING:
         return RandomStringUtils.randomAscii(RANDOM.nextInt(50) + 1);
+      case BOOLEAN:
+        return RANDOM.nextBoolean();
       default:
         throw new RuntimeException("Not supported fieldSpec - " + fieldSpec);
     }
@@ -117,9 +125,11 @@ public abstract class AbstractRecordReaderTest {
         } else {
           Object[] actualRecords = (Object[]) actualRecord.getValue(fieldSpecName);
           List expectedRecords = (List) expectedRecord.get(fieldSpecName);
-          Assert.assertEquals(actualRecords.length, expectedRecords.size());
-          for (int j = 0; j < actualRecords.length; j++) {
-            Assert.assertEquals(actualRecords[j], expectedRecords.get(j));
+          if (expectedRecords != null) {
+            Assert.assertEquals(actualRecords.length, expectedRecords.size());
+            for (int j = 0; j < actualRecords.length; j++) {
+              Assert.assertEquals(actualRecords[j], expectedRecords.get(j));
+            }
           }
         }
       }
@@ -155,6 +165,14 @@ public abstract class AbstractRecordReaderTest {
     return sourceFields;
   }
 
+  protected File compressFileWithGzip(String sourcePath, String targetPath)
+      throws IOException {
+    try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
+      Files.copy(Paths.get(sourcePath), gos);
+      return new File(targetPath);
+    }
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -188,17 +206,52 @@ public abstract class AbstractRecordReaderTest {
     checkValue(_recordReader, _records, _primaryKeys);
   }
 
+  @Test
+  public void testGzipRecordReader()
+      throws Exception {
+    // Test Gzipped Avro file that ends with ".gz"
+    File gzipDataFile = new File(_tempDir, _dataFile.getName() + ".gz");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile.getAbsolutePath());
+    RecordReader recordReader = createRecordReader(gzipDataFile);
+    checkValue(recordReader, _records, _primaryKeys);
+    recordReader.rewind();
+    checkValue(recordReader, _records, _primaryKeys);
+
+    // Test Gzipped Avro file that doesn't end with '.gz'.
+    File gzipDataFile2 = new File(_tempDir, _dataFile.getName() + ".test");
+    compressFileWithGzip(_dataFile.getAbsolutePath(), gzipDataFile2.getAbsolutePath());
+    recordReader = createRecordReader(gzipDataFile2);
+    checkValue(recordReader, _records, _primaryKeys);
+    recordReader.rewind();
+    checkValue(recordReader, _records, _primaryKeys);
+  }
+
   /**
-   * @return an implementation of RecordReader
+   * @return an implementation of RecordReader of the given file
    * @throws Exception
    */
-  protected abstract RecordReader createRecordReader()
+  protected abstract RecordReader createRecordReader(File file)
       throws Exception;
 
+  /**
+   * @return an implementation of RecordReader
+   * @throws Exception
+   */
+  protected RecordReader createRecordReader()
+      throws Exception {
+    return createRecordReader(_dataFile);
+  }
+
   /**
    * Write records into a file
    * @throws Exception
    */
   protected abstract void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
       throws Exception;
+
+  /**
+   * Get data file name
+   * @throws Exception
+   */
+  protected abstract String getDataFileName();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org