You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2018/08/03 07:14:00 UTC
[jira] [Created] (PARQUET-1368) ParquetFileReader should close its
input stream for the failure in constructor
Hyukjin Kwon created PARQUET-1368:
-------------------------------------
Summary: ParquetFileReader should close its input stream for the failure in constructor
Key: PARQUET-1368
URL: https://issues.apache.org/jira/browse/PARQUET-1368
Project: Parquet
Issue Type: Bug
Components: parquet-mr
Affects Versions: 1.10.0
Reporter: Hyukjin Kwon
I was trying to replace deprecated usage {{readFooter}} to {{ParquetFileReader.open}} according to the node:
{code}
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:368: method readFooter in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
[warn] ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
[warn] ^
[warn] /home/jenkins/workspace/spark-master-test-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:545: method readFooter in object ParquetFileReader is deprecated: see corresponding Javadoc for more information.
[warn] ParquetFileReader.readFooter(
[warn] ^
{code}
Then, I realised some test suites reports resource leak:
{code}
java.lang.Throwable
at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:687)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}
The root cause seems to be, the test case intentionally tries to read malformed Parquet file and see if the error can be handled correctly.
In that case, the error is thrown in it's constructor:
{code}
java.lang.RuntimeException: file:/private/var/folders/71/484zt4z10ks1vydt03bhp6hr0000gp/T/spark-c102dafc-b3f7-4c7e-90ee-33d8ecbcd225/second/_SUCCESS is not a Parquet file (too small length: 0)
at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:514)
at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:689)
at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:595)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.createParquetReader(ParquetUtils.scala:67)
at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$.readFooter(ParquetUtils.scala:46)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:544)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:539)
at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}
So, in this case,
{code}
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
this.f = file.newStream();
this.options = options;
this.footer = readFooter(file, options, f, converter);
this.fileMetaData = footer.getFileMetaData();
this.blocks = filterRowGroups(footer.getBlocks());
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
}
{code}
the open stream {{this.f = file.newStream()}} looks unable to be closed.
Therefore, looks the test case reports the resource leak.
In case of the old deprecated {{readFooter}} it's done as below:
{code}
@Deprecated
public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException {
ParquetReadOptions options;
if (file instanceof HadoopInputFile) {
options = HadoopReadOptions.builder(((HadoopInputFile) file).getConfiguration())
.withMetadataFilter(filter).build();
} else {
options = ParquetReadOptions.builder().withMetadataFilter(filter).build();
}
try (SeekableInputStream in = file.newStream()) {
return readFooter(file, options, in);
}
}
{code}
So, looks we are fine with this deprecated method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)