You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2021/11/06 16:36:11 UTC

[spark] branch master updated: [SPARK-37220][SQL] Do not split input file for Parquet reader with aggregate push down

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d3a1337  [SPARK-37220][SQL] Do not split input file for Parquet reader with aggregate push down
d3a1337 is described below

commit d3a133750c94af89e01d5f1b88bc1c49ec509fd4
Author: Cheng Su <ch...@fb.com>
AuthorDate: Sat Nov 6 09:35:16 2021 -0700

    [SPARK-37220][SQL] Do not split input file for Parquet reader with aggregate push down
    
    ### What changes were proposed in this pull request?
    
    As a followup of https://github.com/apache/spark/pull/34298/files#r734795801, Similar to ORC aggregate push down, we can disallow split input files for Parquet reader as well. See original comment for more details of motivation. Also fix the string of `RowDataSourceScanExec` to only print out `PushedAggregates` and `PushedGroupby`, to be aligned with `PushedLimit` and `PushedSample`, as there's not so many queries can benefit from aggregate push down, so we don't need to print those u [...]
    
    ### Why are the changes needed?
    
    Avoid unnecessary file splits in multiple tasks for Parquet reader with aggregate push down.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit test in `FileSourceAggregatePushDownSuite.scala`.
    
    Closes #34498 from c21/agg-fix.
    
    Authored-by: Cheng Su <ch...@fb.com>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala | 2 +-
 .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala      | 5 -----
 .../spark/sql/execution/datasources/v2/parquet/ParquetScan.scala    | 6 +++++-
 3 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index 6b9d181..baf30725 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -44,7 +44,7 @@ case class OrcScan(
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
   override def isSplitable(path: Path): Boolean = {
     // If aggregate is pushed down, only the file footer will be read once,
-    // so file should be not split across multiple tasks.
+    // so file should not be split across multiple tasks.
     pushedAggregate.isEmpty
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 6f021ff..516d8cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -92,11 +92,6 @@ case class ParquetPartitionReaderFactory(
       ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
     } else {
       // For aggregate push down, we will get max/min/count from footer statistics.
-      // We want to read the footer for the whole file instead of reading multiple
-      // footers for every split of the file. Basically if the start (the beginning of)
-      // the offset in PartitionedFile is 0, we will read the footer. Otherwise, it means
-      // that we have already read footer for that file, so we will skip reading again.
-      if (file.start != 0) return null
       ParquetFooterReader.readFooter(conf, filePath, NO_FILTER)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index b92ed82..617faad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -47,7 +47,11 @@ case class ParquetScan(
     pushedAggregate: Option[Aggregation] = None,
     partitionFilters: Seq[Expression] = Seq.empty,
     dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
-  override def isSplitable(path: Path): Boolean = true
+  override def isSplitable(path: Path): Boolean = {
+    // If aggregate is pushed down, only the file footer will be read once,
+    // so file should not be split across multiple tasks.
+    pushedAggregate.isEmpty
+  }
 
   override def readSchema(): StructType = {
     // If aggregate is pushed down, schema has already been pruned in `ParquetScanBuilder`

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org