You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/07 16:36:00 UTC

[jira] [Updated] (PARQUET-1368) ParquetFileReader should close its input stream for the failure in constructor

     [ https://issues.apache.org/jira/browse/PARQUET-1368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated PARQUET-1368:
------------------------------------
    Labels: pull-request-available  (was: )

> ParquetFileReader should close its input stream for the failure in constructor
> ------------------------------------------------------------------------------
>
>                 Key: PARQUET-1368
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1368
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.10.0
>            Reporter: Hyukjin Kwon
>            Priority: Major
>              Labels: pull-request-available
>
> I was trying to replace deprecated usage {{readFooter}} to {{ParquetFileReader.open}} according to the node:
> {code}
> [warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:368: method readFooter in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
> [warn]         ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
> [warn]                           ^
> [warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:545: method readFooter in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
> [warn]             ParquetFileReader.readFooter(
> [warn]                               ^
> {code}
> Then, I realised some test suites reports resource leak:
> {code}
> java.lang.Throwable
> 	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
> 	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
> 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
> 	at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
> 	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:687)
> 	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
> 	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
> 	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
> 	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
> 	at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
> 	at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
> 	at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
> 	at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
> 	at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 	at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> The root cause seems to be, the test case intentionally tries to read malformed Parquet file and see if the error can be handled correctly.
> In that case, the error is thrown in it's constructor:
> {code}
> java.lang.RuntimeException: file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-c102dafc-b3f7-4c7e-90ee-33d8ecbcd225/second/_SUCCESS is not a Parquet file (too small length: 0)
> 	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
> 	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
> 	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
> 	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
> 	at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
> 	at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
> 	at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> 	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> 	at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
> 	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
> 	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> 	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> So, in this case, 
> {code}
>   public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
>     this.converter = new ParquetMetadataConverter(options);
>     this.file = file;
>     this.f = file.newStream();
>     this.options = options;
>     this.footer = readFooter(file, options, f, converter);
>     this.fileMetaData = footer.getFileMetaData();
>     this.blocks = filterRowGroups(footer.getBlocks());
>     for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
>       paths.put(ColumnPath.get(col.getPath()), col);
>     }
>   }
> {code}
> the open stream {{this.f = file.newStream()}} looks unable to be closed.
> Therefore, looks the test case reports the resource leak.
> In case of the old deprecated {{readFooter}} it's done as below:
> {code}
>   @Deprecated
>   public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
>     ParquetReadOptions options;
>     if (file instanceof HadoopInputFile) {
>       options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
>           .withMetadataFilter(filter).build();
>     } else {
>       options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
>     }
>     try (SeekableInputStream in = file.newStream()) {
>       return readFooter(file, options, in);
>     }
>   }
> {code}
> So, looks we are fine with this deprecated method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)