You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/05 19:53:27 UTC

spark git commit: [SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in ParquetFileFormat

Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4550673b1 -> 911b83da4


[SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in ParquetFileFormat

## What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization.

- [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)
- [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/)

```
Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
	at
```

## How was this patch tested?

Manual. The following test case generates the same leakage.

```scala
  test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }
```

Author: Dongjoon Hyun <do...@apache.org>

Closes #20714 from dongjoon-hyun/SPARK-23457-2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/911b83da
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/911b83da
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/911b83da

Branch: refs/heads/branch-2.3
Commit: 911b83da42fa850eb3ae419687c204cb2e25767b
Parents: 4550673
Author: Dongjoon Hyun <do...@apache.org>
Authored: Mon Mar 5 11:53:23 2018 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Mar 5 11:53:23 2018 -0800

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala | 22 +++++++++-----------
 1 file changed, 10 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/911b83da/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f53a97b..a6129da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -394,16 +394,21 @@ class ParquetFileFormat
         ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
       }
       val taskContext = Option(TaskContext.get())
-      val parquetReader = if (enableVectorizedReader) {
+      if (enableVectorizedReader) {
         val vectorizedReader = new VectorizedParquetRecordReader(
           convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined)
+        val iter = new RecordReaderIterator(vectorizedReader)
+        // SPARK-23457 Register a task completion lister before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
         vectorizedReader.initialize(split, hadoopAttemptContext)
         logDebug(s"Appending $partitionSchema ${file.partitionValues}")
         vectorizedReader.initBatch(partitionSchema, file.partitionValues)
         if (returningBatch) {
           vectorizedReader.enableReturningBatches()
         }
-        vectorizedReader
+
+        // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
@@ -413,18 +418,11 @@ class ParquetFileFormat
         } else {
           new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
         }
+        val iter = new RecordReaderIterator(reader)
+        // SPARK-23457 Register a task completion lister before `initialization`.
+        taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
         reader.initialize(split, hadoopAttemptContext)
-        reader
-      }
 
-      val iter = new RecordReaderIterator(parquetReader)
-      taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
-
-      // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
-      if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
-          enableVectorizedReader) {
-        iter.asInstanceOf[Iterator[InternalRow]]
-      } else {
         val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
         val joinedRow = new JoinedRow()
         val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org