You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/12/08 05:59:08 UTC
[pinot] branch master updated: feat: add compressed file support for ORCRecordReader (#9884)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8a2fbf9caf feat: add compressed file support for ORCRecordReader (#9884)
8a2fbf9caf is described below
commit 8a2fbf9caff8cb1bc7ec4bda94827fc556fc3866
Author: Eugene Tolbakov <ev...@gmail.com>
AuthorDate: Thu Dec 8 05:58:59 2022 +0000
feat: add compressed file support for ORCRecordReader (#9884)
* feat: add compressed file support for ORCRecordReader
* feat: fix code review remarks, add gz support for parquet files
* feat: move unpackIfRequired method in RecordReaderUtils
* feat: apply Pinot code style
* chore: fix code styles
---
.../plugin/inputformat/orc/ORCRecordReader.java | 18 +++----
.../inputformat/orc/ORCRecordReaderTest.java | 27 +++++++++++
.../parquet/ParquetAvroRecordReader.java | 4 +-
.../parquet/ParquetNativeRecordReader.java | 17 ++++---
.../inputformat/parquet/ParquetRecordReader.java | 6 ++-
.../plugin/inputformat/parquet/ParquetUtils.java | 7 +--
.../parquet/ParquetNativeRecordReaderTest.java | 4 +-
.../parquet/ParquetRecordReaderTest.java | 55 ++++++++++++++++++++--
.../pinot/spi/data/readers/RecordReaderUtils.java | 25 ++++++++++
9 files changed, 134 insertions(+), 29 deletions(-)
diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index 2ecd7ad6d2..f98ab58921 100644
--- a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -45,6 +45,7 @@ import org.apache.orc.TypeDescription;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -77,11 +78,12 @@ public class ORCRecordReader implements RecordReader {
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
Configuration configuration = new Configuration();
- Reader orcReader = OrcFile.createReader(new Path(dataFile.getAbsolutePath()),
+ File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc");
+ Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()),
OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration)));
TypeDescription orcSchema = orcReader.getSchema();
- Preconditions
- .checkState(orcSchema.getCategory() == TypeDescription.Category.STRUCT, "ORC schema must be of type: STRUCT");
+ Preconditions.checkState(orcSchema.getCategory() == TypeDescription.Category.STRUCT,
+ "ORC schema must be of type: STRUCT");
_orcFields = orcSchema.getFieldNames();
_orcFieldTypes = orcSchema.getChildren();
@@ -128,9 +130,8 @@ public class ORCRecordReader implements RecordReader {
// Maps always have two child columns for its keys and values
List<TypeDescription> children = fieldType.getChildren();
TypeDescription.Category keyCategory = children.get(0).getCategory();
- Preconditions
- .checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)", keyCategory,
- field);
+ Preconditions.checkState(isSupportedSingleValueType(keyCategory), "Illegal map key field type: %s (field %s)",
+ keyCategory, field);
initFieldsToRead(orcReaderInclude, children.get(1), field);
} else if (category == TypeDescription.Category.STRUCT) {
List<String> childrenFieldNames = fieldType.getFieldNames();
@@ -141,9 +142,8 @@ public class ORCRecordReader implements RecordReader {
}
} else {
// Single-value field
- Preconditions
- .checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)", category,
- field);
+ Preconditions.checkState(isSupportedSingleValueType(category), "Illegal single-value field type: %s (field %s)",
+ category, field);
}
}
diff --git a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
index c244d93417..dd9b12a78c 100644
--- a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
@@ -17,9 +17,15 @@ package org.apache.pinot.plugin.inputformat.orc;
* specific language governing permissions and limitations
* under the License.
*/
+
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -32,6 +38,7 @@ import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.annotations.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -39,6 +46,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public class ORCRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.orc");
+ private void compressGzip(String sourcePath, String targetPath)
+ throws IOException {
+ try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
+ Files.copy(Paths.get(sourcePath), gos);
+ }
+ }
+
@Override
protected RecordReader createRecordReader()
throws Exception {
@@ -143,4 +157,17 @@ public class ORCRecordReaderTest extends AbstractRecordReaderTest {
}
writer.close();
}
+
+ @Test
+ public void testGzipORCRecordReader()
+ throws Exception {
+ String gzipFileName = "data.orc.gz";
+ compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName));
+ final File gzDataFile = new File(_tempDir, gzipFileName);
+ ORCRecordReader orcRecordReader = new ORCRecordReader();
+ orcRecordReader.init(gzDataFile, _sourceFields, null);
+ checkValue(orcRecordReader, _records, _primaryKeys);
+ orcRecordReader.rewind();
+ checkValue(orcRecordReader, _records, _primaryKeys);
+ }
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 9c494f0baf..787ddc25e7 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
/**
@@ -48,7 +49,8 @@ public class ParquetAvroRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
- _dataFilePath = new Path(dataFile.getAbsolutePath());
+ File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+ _dataFilePath = new Path(parquetFile.getAbsolutePath());
_parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
_recordExtractor = new AvroRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index 3f413b9a68..da89c8a382 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.parquet.schema.MessageType;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
/**
@@ -58,17 +59,16 @@ public class ParquetNativeRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
- _dataFilePath = new Path(dataFile.getAbsolutePath());
+ File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
+ _dataFilePath = new Path(parquetFile.getAbsolutePath());
_hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
_recordExtractor = new ParquetNativeRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
- _parquetReadOptions = ParquetReadOptions.builder()
- .withMetadataFilter(ParquetMetadataConverter.NO_FILTER)
- .build();
+ _parquetReadOptions = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build();
- _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
- _parquetReadOptions);
+ _parquetFileReader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), _parquetReadOptions);
_schema = _parquetFileReader.getFooter().getFileMetaData().getSchema();
_pageReadStore = _parquetFileReader.readNextRowGroup();
_columnIO = new ColumnIOFactory().getColumnIO(_schema);
@@ -76,7 +76,6 @@ public class ParquetNativeRecordReader implements RecordReader {
_currentPageIdx = 0;
}
-
@Override
public boolean hasNext() {
if (_pageReadStore == null) {
@@ -119,8 +118,8 @@ public class ParquetNativeRecordReader implements RecordReader {
public void rewind()
throws IOException {
_parquetFileReader.close();
- _parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf),
- _parquetReadOptions);
+ _parquetFileReader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(_dataFilePath, _hadoopConf), _parquetReadOptions);
_pageReadStore = _parquetFileReader.readNextRowGroup();
_parquetRecordReader = _columnIO.getRecordReader(_pageReadStore, new GroupRecordConverter(_schema));
_currentPageIdx = 0;
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index 6be22851fc..60886b3b30 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
/**
@@ -39,6 +40,7 @@ public class ParquetRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
+ File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else if (recordReaderConfig != null
@@ -47,14 +49,14 @@ public class ParquetRecordReader implements RecordReader {
_internalParquetRecordReader = new ParquetNativeRecordReader();
} else {
// No reader type specified. Determine using file metadata
- if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(dataFile.getAbsolutePath()))) {
+ if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(parquetFile.getAbsolutePath()))) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else {
_useAvroParquetRecordReader = false;
_internalParquetRecordReader = new ParquetNativeRecordReader();
}
}
- _internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
+ _internalParquetRecordReader.init(parquetFile, fieldsToRead, recordReaderConfig);
}
@Override
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
index a55e3fa702..f576a0a325 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetUtils.java
@@ -59,8 +59,8 @@ public class ParquetUtils {
*/
public static ParquetWriter<GenericRecord> getParquetAvroWriter(Path path, Schema schema)
throws IOException {
- return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema)
- .withConf(getParquetHadoopConfiguration()).build();
+ return AvroParquetWriter.<GenericRecord>builder(path).withSchema(schema).withConf(getParquetHadoopConfiguration())
+ .build();
}
/**
@@ -85,7 +85,8 @@ public class ParquetUtils {
}
}
- public static boolean hasAvroSchemaInFileMetadata(Path path) throws IOException {
+ public static boolean hasAvroSchemaInFileMetadata(Path path)
+ throws IOException {
ParquetMetadata footer =
ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
index ffc80a98bb..378195d976 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReaderTest.java
@@ -56,8 +56,8 @@ public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
}
records.add(record);
}
- try (ParquetWriter<GenericRecord> writer = ParquetUtils
- .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+ try (ParquetWriter<GenericRecord> writer = ParquetUtils.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()),
+ schema)) {
for (GenericRecord record : records) {
writer.write(record);
}
diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 14dda0e5d1..345cb1cdbd 100644
--- a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -20,10 +20,14 @@ package org.apache.pinot.plugin.inputformat.parquet;
import com.google.common.collect.ImmutableSet;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.zip.GZIPOutputStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
@@ -40,6 +44,7 @@ import org.testng.annotations.Test;
public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.parquet");
+ private final String _gzipFileName = "data.parquet.gz";
private final File _testParquetFileWithInt96AndDecimal =
new File(getClass().getClassLoader().getResource("test-file-with-int96-and-decimal.snappy.parquet").getFile());
@@ -63,14 +68,21 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
}
records.add(record);
}
- try (ParquetWriter<GenericRecord> writer = ParquetUtils
- .getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()), schema)) {
+ try (ParquetWriter<GenericRecord> writer = ParquetUtils.getParquetAvroWriter(new Path(_dataFile.getAbsolutePath()),
+ schema)) {
for (GenericRecord record : records) {
writer.write(record);
}
}
}
+ private void compressGzip(String sourcePath, String targetPath)
+ throws IOException {
+ try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
+ Files.copy(Paths.get(sourcePath), gos);
+ }
+ }
+
@Test
public void testParquetAvroRecordReader()
throws IOException {
@@ -108,7 +120,6 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
// Should be avro since file metadata has avro schema
Assert.assertTrue(parquetRecordReader.useAvroParquetRecordReader());
-
final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
File nativeParquetFile = new File(getClass().getClassLoader().getResource("users.parquet").getFile());
parquetRecordReader.init(nativeParquetFile, null, null);
@@ -157,4 +168,42 @@ public class ParquetRecordReaderTest extends AbstractRecordReaderTest {
Assert.assertEquals(recordsRead, totalRecords,
"Message read from ParquetRecordReader doesn't match the expected number.");
}
+
+ @Test
+ public void testGzipParquetRecordReader()
+ throws IOException {
+ compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
+ final File gzDataFile = new File(_tempDir, _gzipFileName);
+ ParquetRecordReader recordReader = new ParquetRecordReader();
+ recordReader.init(gzDataFile, _sourceFields, null);
+ testReadParquetFile(recordReader, SAMPLE_RECORDS_SIZE);
+ }
+
+ @Test
+ public void testGzipParquetAvroRecordReader()
+ throws IOException {
+ ParquetAvroRecordReader avroRecordReader = new ParquetAvroRecordReader();
+ compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
+ final File gzDataFile = new File(_tempDir, _gzipFileName);
+ avroRecordReader.init(gzDataFile, null, new ParquetRecordReaderConfig());
+ testReadParquetFile(avroRecordReader, SAMPLE_RECORDS_SIZE);
+ }
+
+ @Test
+ public void testGzipParquetNativeRecordReader()
+ throws IOException {
+ ParquetNativeRecordReader nativeRecordReader = new ParquetNativeRecordReader();
+
+ final String gzParquetFileWithInt96AndDecimal =
+ String.format("%s.gz", _testParquetFileWithInt96AndDecimal.getAbsolutePath());
+ compressGzip(_testParquetFileWithInt96AndDecimal.getAbsolutePath(), gzParquetFileWithInt96AndDecimal);
+ final File gzTestParquetFileWithInt96AndDecimal = new File(gzParquetFileWithInt96AndDecimal);
+ nativeRecordReader.init(gzTestParquetFileWithInt96AndDecimal, ImmutableSet.of(), new ParquetRecordReaderConfig());
+ testReadParquetFile(nativeRecordReader, 1965);
+
+ compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, _gzipFileName));
+ final File gzDataFile = new File(_tempDir, _gzipFileName);
+ nativeRecordReader.init(gzDataFile, ImmutableSet.of(), new ParquetRecordReaderConfig());
+ testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
index 657686e196..fd84c5f306 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderUtils.java
@@ -25,7 +25,10 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.zip.GZIPInputStream;
@@ -57,4 +60,26 @@ public class RecordReaderUtils {
return new FileInputStream(dataFile);
}
}
+
+ public static File unpackIfRequired(File dataFile, String extension)
+ throws IOException {
+ if (isGZippedFile(dataFile)) {
+ try (final InputStream inputStream = getInputStream(dataFile)) {
+ File targetFile = new File(String.format("%s.%s", dataFile.getAbsolutePath(), extension));
+ Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ return targetFile;
+ }
+ } else {
+ return dataFile;
+ }
+ }
+
+ private static boolean isGZippedFile(File file)
+ throws IOException {
+ int magic = 0;
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
+ magic = raf.read() & 0xff | ((raf.read() << 8) & 0xff00);
+ }
+ return magic == GZIPInputStream.GZIP_MAGIC;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org