You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2021/11/06 16:36:11 UTC
[spark] branch master updated: [SPARK-37220][SQL] Do not split
input file for Parquet reader with aggregate push down
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d3a1337 [SPARK-37220][SQL] Do not split input file for Parquet reader with aggregate push down
d3a1337 is described below
commit d3a133750c94af89e01d5f1b88bc1c49ec509fd4
Author: Cheng Su <ch...@fb.com>
AuthorDate: Sat Nov 6 09:35:16 2021 -0700
[SPARK-37220][SQL] Do not split input file for Parquet reader with aggregate push down
### What changes were proposed in this pull request?
As a followup of https://github.com/apache/spark/pull/34298/files#r734795801, Similar to ORC aggregate push down, we can disallow split input files for Parquet reader as well. See original comment for more details of motivation. Also fix the string of `RowDataSourceScanExec` to only print out `PushedAggregates` and `PushedGroupby`, to be aligned with `PushedLimit` and `PushedSample`, as there's not so many queries can benefit from aggregate push down, so we don't need to print those u [...]
### Why are the changes needed?
Avoid unnecessary file splits in multiple tasks for Parquet reader with aggregate push down.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit test in `FileSourceAggregatePushDownSuite.scala`.
Closes #34498 from c21/agg-fix.
Authored-by: Cheng Su <ch...@fb.com>
Signed-off-by: Chao Sun <su...@apple.com>
---
.../org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala | 2 +-
.../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 5 -----
.../spark/sql/execution/datasources/v2/parquet/ParquetScan.scala | 6 +++++-
3 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
index 6b9d181..baf30725 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScan.scala
@@ -44,7 +44,7 @@ case class OrcScan(
dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
- // so file should be not split across multiple tasks.
+ // so file should not be split across multiple tasks.
pushedAggregate.isEmpty
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 6f021ff..516d8cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -92,11 +92,6 @@ case class ParquetPartitionReaderFactory(
ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
} else {
// For aggregate push down, we will get max/min/count from footer statistics.
- // We want to read the footer for the whole file instead of reading multiple
- // footers for every split of the file. Basically if the start (the beginning of)
- // the offset in PartitionedFile is 0, we will read the footer. Otherwise, it means
- // that we have already read footer for that file, so we will skip reading again.
- if (file.start != 0) return null
ParquetFooterReader.readFooter(conf, filePath, NO_FILTER)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index b92ed82..617faad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -47,7 +47,11 @@ case class ParquetScan(
pushedAggregate: Option[Aggregation] = None,
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
- override def isSplitable(path: Path): Boolean = true
+ override def isSplitable(path: Path): Boolean = {
+ // If aggregate is pushed down, only the file footer will be read once,
+ // so file should not be split across multiple tasks.
+ pushedAggregate.isEmpty
+ }
override def readSchema(): StructType = {
// If aggregate is pushed down, schema has already been pruned in `ParquetScanBuilder`
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org