You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2022/06/08 08:13:00 UTC
[jira] [Created] (PARQUET-2154) ParquetFileReader should close its input stream when `filterRowGroups` throw Exception in constructor
Yang Jie created PARQUET-2154:
---------------------------------
Summary: ParquetFileReader should close its input stream when `filterRowGroups` throw Exception in constructor
Key: PARQUET-2154
URL: https://issues.apache.org/jira/browse/PARQUET-2154
Project: Parquet
Issue Type: Bug
Reporter: Yang Jie
Parquet only supports predicate push-down for non-repeated primitive types now(PARQUET-34), so if try to push-down a filter for a repeated primitive type, the constructor of ParquetFileReader will thrown `java.lang.IllegalArgumentException` as follows:
{code:java}
21:57:24.190 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column f is repeated.
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:195)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:164)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:92)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:195)
at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:871)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:790)
at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:100)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:340)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:211)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:272)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:118)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:580)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1908)
at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1268)
at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1268)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2267)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750) {code}
and the above issue will cause resource leak.
{code:java}
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
this.f = file.newStream();
this.options = options;
try {
this.footer = readFooter(file, options, f, converter);
} catch (Exception e) {
// In case that reading footer throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
f.close();
throw e;
}
this.fileMetaData = footer.getFileMetaData();
this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
this.fileDecryptor = null; // Plaintext file. No need in decryptor
}
this.blocks = filterRowGroups(footer.getBlocks());
this.blockIndexStores = listWithNulls(this.blocks.size());
this.blockRowRanges = listWithNulls(this.blocks.size());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
} {code}
In the above code, when `filterRowGroups(footer.getBlocks())` throw an Exception, the open stream `{{{}this.f = file.newStream()`{}}} looks unable to be closed.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)