You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/22 14:47:15 UTC

[GitHub] [spark] ankurdave commented on a change in pull request #34369: [SPARK-37089][SQL] Do not register ParquetFileFormat completion listener lazily

ankurdave commented on a change in pull request #34369:
URL: https://github.com/apache/spark/pull/34369#discussion_r734608343



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -327,18 +327,31 @@ class ParquetFileFormat
           int96RebaseMode.toString,
           enableOffHeapColumnVector && taskContext.isDefined,
           capacity)
+        // SPARK-37089: We cannot register a task completion listener to close this iterator here
+        // because downstream exec nodes have already registered their listeners. Since listeners
+        // are executed in reverse order of registration, a listener registered here would close the
+        // iterator while downstream exec nodes are still running. When off-heap column vectors are
+        // enabled, this can cause a use-after-free bug leading to a segfault.
+        //
+        // Instead, we use FileScanRDD's task completion listener to close this iterator.
         val iter = new RecordReaderIterator(vectorizedReader)
-        // SPARK-23457 Register a task completion listener before `initialization`.
-        taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
-        vectorizedReader.initialize(split, hadoopAttemptContext)
-        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
-        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
-        if (returningBatch) {
-          vectorizedReader.enableReturningBatches()
-        }
+        try {
+          vectorizedReader.initialize(split, hadoopAttemptContext)
+          logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+          vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+          if (returningBatch) {
+            vectorizedReader.enableReturningBatches()
+          }
 
-        // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
-        iter.asInstanceOf[Iterator[InternalRow]]
+          // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } catch {
+          case e: Throwable =>
+            // SPARK-23457: In case there is an exception in initialization, close the iterator to

Review comment:
       In general, I think FileScanRDD does close the iterator when hitting exceptions, because it uses a task completion listener to do so. The only case where it will not close the iterator is when the exception prevents FileScanRDD from getting a reference to the iterator, as is the case here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org