You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/28 17:48:24 UTC

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #5141: [HUDI-3724] Fixing closure of ParquetReader

alexeykudinkin commented on a change in pull request #5141:
URL: https://github.com/apache/hudi/pull/5141#discussion_r836669643



##########
File path: hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java
##########
@@ -30,14 +32,17 @@
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ParquetReaderIterator;
+
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetReader;
 
 public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileReader<R> {
+  
   private final Path path;
   private final Configuration conf;
   private final BaseFileUtils parquetUtils;
+  private List<ParquetReaderIterator> readerIterators = new ArrayList<>();

Review comment:
       @nsivabalan please make it final

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -333,7 +335,13 @@ object HoodieBaseRelation {
     partitionedFile => {
       val extension = FSUtils.getFileExtension(partitionedFile.filePath)
       if (HoodieFileFormat.PARQUET.getFileExtension.equals(extension)) {
-        parquetReader.apply(partitionedFile)
+        val iter = parquetReader.apply(partitionedFile)
+        if (iter.isInstanceOf[Closeable]) {
+          // register a callback to close parquetReader which will be executed on task completion.
+          // when tasks finished, this method will be called, and release resources.
+          Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close()))

Review comment:
       While i appreciate the intent here to tie up the iterator to the scope of particular task, i don't think this is the right way to fix it: you're tying the lifespan of the iterator to that one of the task (which in this case runs on executor), but there's no clear invariant why this iterator could not outlive this task.
   
   Instead we should rely on the RDD to close out the iterator when its done with iteration. And if you would take a look at `FileScanRDD` (which we rely on) you can see that it does exactly that. The reason why it's broken right now is b/c we modify the iterator (which is not inheriting from Closeable anymore):
   
   ```
   file: PartitionedFile => {
         val iter = readParquetFile(file)
         iter.flatMap {
           case r: InternalRow => Seq(r)
           case b: ColumnarBatch => b.rowIterator().asScala
         }
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org