You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "yabola (via GitHub)" <gi...@apache.org> on 2023/03/01 16:08:29 UTC

[GitHub] [spark] yabola commented on a diff in pull request #39950: [SPARK-42388][SQL] Avoid parquet footer reads twice when no filters in vectorized reader

yabola commented on code in PR #39950:
URL: https://github.com/apache/spark/pull/39950#discussion_r1121979988


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:
##########
@@ -296,41 +309,45 @@ class ParquetFileFormat
             throw e
         }
       } else {
-        logDebug(s"Falling back to parquet-mr")
-        // ParquetRecordReader returns InternalRow
-        val readSupport = new ParquetReadSupport(
-          convertTz,
-          enableVectorizedReader = false,
-          datetimeRebaseSpec,
-          int96RebaseSpec)
-        val reader = if (pushed.isDefined && enableRecordFilter) {
-          val parquetFilter = FilterCompat.get(pushed.get, null)
-          new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
-        } else {
-          new ParquetRecordReader[InternalRow](readSupport)
-        }
-        val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
-            requiredSchema)
-        val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
         try {
-          readerWithRowIndexes.initialize(split, hadoopAttemptContext)
-
-          val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
-          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-
-          if (partitionSchema.length == 0) {
-            // There is no partition columns
-            iter.map(unsafeProjection)
+          logDebug(s"Falling back to parquet-mr")
+          // ParquetRecordReader returns InternalRow
+          val readSupport = new ParquetReadSupport(
+            convertTz,
+            enableVectorizedReader = false,
+            datetimeRebaseSpec,
+            int96RebaseSpec)
+          val reader = if (pushed.isDefined && enableRecordFilter) {
+            val parquetFilter = FilterCompat.get(pushed.get, null)
+            new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
           } else {
-            val joinedRow = new JoinedRow()
-            iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+            new ParquetRecordReader[InternalRow](readSupport)
           }
-        } catch {
-          case e: Throwable =>
-            // SPARK-23457: In case there is an exception in initialization, close the iterator to
-            // avoid leaking resources.
-            iter.close()
-            throw e
+          val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+            requiredSchema)
+          val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
+          try {
+            readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+            val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+            val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+            if (partitionSchema.length == 0) {
+              // There is no partition columns
+              iter.map(unsafeProjection)
+            } else {
+              val joinedRow = new JoinedRow()
+              iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+            }
+          } catch {
+            case e: Throwable =>
+              // SPARK-23457: In case there is an exception in initialization, close the iterator to
+              // avoid leaking resources.
+              iter.close()
+              throw e
+          }
+        } finally {
+          parquetReader.close()

Review Comment:
   I haven't change the code logic, just add to close parquetReader.
   Before, if the non-vectorization code path will close the reader, it is consistent here. Please see
   [the original implementation code](https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java#L53)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org