You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2022/06/08 08:13:00 UTC

[jira] [Created] (PARQUET-2154) ParquetFileReader should close its input stream when `filterRowGroups` throw Exception in constructor

Yang Jie created PARQUET-2154:
---------------------------------

             Summary: ParquetFileReader should close its input stream when `filterRowGroups` throw Exception in constructor
                 Key: PARQUET-2154
                 URL: https://issues.apache.org/jira/browse/PARQUET-2154
             Project: Parquet
          Issue Type: Bug
            Reporter: Yang Jie


Parquet only supports predicate push-down for non-repeated primitive types now(PARQUET-34),  so if try to push-down a filter for a repeated primitive type, the constructor of ParquetFileReader will thrown `java.lang.IllegalArgumentException` as follows:

 
{code:java}
21:57:24.190 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalArgumentException: FilterPredicates do not currently support repeated columns. Column f is repeated.
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumn(SchemaCompatibilityValidator.java:195)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validateColumnFilterPredicate(SchemaCompatibilityValidator.java:164)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:92)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.visit(SchemaCompatibilityValidator.java:59)
    at org.apache.parquet.filter2.predicate.Operators$NotEq.accept(Operators.java:195)
    at org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate(SchemaCompatibilityValidator.java:64)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:95)
    at org.apache.parquet.filter2.compat.RowGroupFilter.visit(RowGroupFilter.java:45)
    at org.apache.parquet.filter2.compat.FilterCompat$FilterPredicateCompat.accept(FilterCompat.java:149)
    at org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups(RowGroupFilter.java:72)
    at org.apache.parquet.hadoop.ParquetFileReader.filterRowGroups(ParquetFileReader.java:871)
    at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:790)
    at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:100)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:173)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$1(ParquetFileFormat.scala:340)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:211)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:272)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:118)
    at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:580)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1908)
    at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1268)
    at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1268)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2267)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1481)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750) {code}
 

and the above issue will cause resource leak.

 
{code:java}
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
  this.converter = new ParquetMetadataConverter(options);
  this.file = file;
  this.f = file.newStream();
  this.options = options;
  try {
    this.footer = readFooter(file, options, f, converter);
  } catch (Exception e) {
    // In case that reading footer throws an exception in the constructor, the new stream
    // should be closed. Otherwise, there's no way to close this outside.
    f.close();
    throw e;
  }
  this.fileMetaData = footer.getFileMetaData();
  this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
  if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
    this.fileDecryptor = null; // Plaintext file. No need in decryptor
  }

  this.blocks = filterRowGroups(footer.getBlocks());
  this.blockIndexStores = listWithNulls(this.blocks.size());
  this.blockRowRanges = listWithNulls(this.blocks.size());
  for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
    paths.put(ColumnPath.get(col.getPath()), col);
  }
  this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
} {code}
In the above code, when `filterRowGroups(footer.getBlocks())` throw an Exception, the open stream `{{{}this.f = file.newStream()`{}}} looks unable to be closed.

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)