You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2018/08/03 07:14:00 UTC

[jira] [Created] (PARQUET-1368) ParquetFileReader should close its input stream for the failure in constructor

Hyukjin Kwon created PARQUET-1368:
-------------------------------------

             Summary: ParquetFileReader should close its input stream for the failure in constructor
                 Key: PARQUET-1368
                 URL: https://issues.apache.org/jira/browse/PARQUET-1368
             Project: Parquet
          Issue Type: Bug
          Components: parquet-mr
    Affects Versions: 1.10.0
            Reporter: Hyukjin Kwon


I was trying to replace deprecated usage {{readFooter}} to {{ParquetFileReader.open}} according to the node:

{code}

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:368: method readFooter in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
[warn]         ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
[warn]                           ^

[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:545: method readFooter in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
[warn]             ParquetFileReader.readFooter(
[warn]                               ^
{code}

Then, I realised some test suites reports resource leak:

{code}
java.lang.Throwable
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
	at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:687)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
	at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
	at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
	at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
	at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
	at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

The root cause seems to be, the test case intentionally tries to read malformed Parquet file and see if the error can be handled correctly.

In that case, the error is thrown in it's constructor:

{code}
java.lang.RuntimeException: file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-c102dafc-b3f7-4c7e-90ee-33d8ecbcd225/second/_SUCCESS is not a Parquet file (too small length: 0)
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

So, in this case, 

{code}
  public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
    this.converter = new ParquetMetadataConverter(options);
    this.file = file;
    this.f = file.newStream();
    this.options = options;
    this.footer = readFooter(file, options, f, converter);
    this.fileMetaData = footer.getFileMetaData();
    this.blocks = filterRowGroups(footer.getBlocks());
    for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
      paths.put(ColumnPath.get(col.getPath()), col);
    }
  }
{code}

the open stream {{this.f = file.newStream()}} looks unable to be closed.

Therefore, looks the test case reports the resource leak.

In case of the old deprecated {{readFooter}} it's done as below:

{code}
  @Deprecated
  public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
    ParquetReadOptions options;
    if (file instanceof HadoopInputFile) {
      options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
          .withMetadataFilter(filter).build();
    } else {
      options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
    }

    try (SeekableInputStream in = file.newStream()) {
      return readFooter(file, options, in);
    }
  }
{code}

So, looks we are fine with this deprecated method.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)