You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/29 16:46:56 UTC

[GitHub] [spark] peter-toth opened a new pull request, #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

peter-toth opened a new pull request, #37711:
URL: https://github.com/apache/spark/pull/37711

   ### What changes were proposed in this pull request?
   After https://github.com/apache/spark/pull/32298 we were able to merge scalar subquery plans, but DSv2 sources couldn't benefit from that improvement.
   The reason for DSv2 sources were not supported by default is that `SparkOptimizer.earlyScanPushDownRules` build different `Scan`s in logical plans before `MergeScalarSubqueries` is executed. Those `Scan`s can have different pushed-down filters and aggregates and different column pruning defined, which prevents merging the plans.
   I would not alter the order of optimization rules as `MergeScalarSubqueries` works better when logical plans are better optimized (a plan is closer to its final logical form, e.g. `InjectRuntimeFilter` already executed). But instead, I would propose introducing a new interface that a `Scan` can implement to indicate if merge is possible with another `Scan` and do the merge if it make sense depending on the `Scan`'s actual parameters.
   
   This PR:
   - adds a new interface `SupportsMerge` that `Scan`s can implement to indicate if 2 `Scan`s can be merged and
   - adds implementation of `SupportsMerge` to `ParquetScan` as the first DSv2 source. The merge only happens if pushed-down data and partition filters, pushed-down aggregates and column pruning match.
   
   ### Why are the changes needed?
   Scalar subquery merge can bring considerable performance improvement (see the original https://github.com/apache/spark/pull/32298 for the benchmarks) so DSv2 sources should also benefit from that feature.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Added new UT.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #37711:
URL: https://github.com/apache/spark/pull/37711#issuecomment-1352393959

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

Posted by GitBox <gi...@apache.org>.
peter-toth commented on PR #37711:
URL: https://github.com/apache/spark/pull/37711#issuecomment-1367249807

   Thanks for the comments @singhpk234!
   Unfortunately this PR got closed due to lack of reviews and can't be reopened. I'm happy to open a new one and take into account your suggestions but first it would be great if a Spark committer would confirm that the proposed `SupportsMerge` scan interface makes sense and somone have willingness to give some feedback about the change. Any feedback is much appreciated, really.
   
   Maybe @cloud-fan or @gengliangwang are you interested in this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

Posted by GitBox <gi...@apache.org>.
peter-toth commented on PR #37711:
URL: https://github.com/apache/spark/pull/37711#issuecomment-1230582267

   cc @cloud-fan, @sigmod, @singhpk234


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] singhpk234 commented on a diff in pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on code in PR #37711:
URL: https://github.com/apache/spark/pull/37711#discussion_r1058577594


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala:
##########
@@ -138,4 +144,29 @@ case class ParquetScan(
       Map("PushedAggregation" -> pushedAggregationsStr) ++
       Map("PushedGroupBy" -> pushedGroupByStr)
   }
+
+  override def mergeWith(other: SupportsMerge, table: SupportsRead): Optional[SupportsMerge] = {
+    if (other.isInstanceOf[ParquetScan]) {
+      val o = other.asInstanceOf[ParquetScan]
+      if (fileIndex == o.fileIndex &&
+          options == o.options &&
+          dataSchema == o.dataSchema &&
+          equivalentFilters(pushedFilters, o.pushedFilters) &&
+          pushedDownAggEqual(o) &&
+          normalizedPartitionFilters == o.normalizedPartitionFilters &&
+          normalizedDataFilters == o.normalizedDataFilters) {

Review Comment:
   [question] should we just disjunct these diff filters from scans and run a boolean simplification on top of it ? to handle the cases with diff partition and data filter on the scans ? 
   
   Are we expecting some heuristic here ? as if when combining the filters will be useful ? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala:
##########
@@ -106,15 +109,18 @@ case class ParquetScan(
       new ParquetOptions(options.asCaseSensitiveMap.asScala.toMap, sqlConf))
   }
 
+  private def pushedDownAggEqual(p: ParquetScan) = {
+    if (pushedAggregate.nonEmpty && p.pushedAggregate.nonEmpty) {
+      AggregatePushDownUtils.equivalentAggregations(pushedAggregate.get, p.pushedAggregate.get)
+    } else {
+      pushedAggregate.isEmpty && p.pushedAggregate.isEmpty
+    }
+  }

Review Comment:
   should we move this to FileScan itself ? OrcScan also has some duplicate code 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala:
##########
@@ -138,4 +144,29 @@ case class ParquetScan(
       Map("PushedAggregation" -> pushedAggregationsStr) ++
       Map("PushedGroupBy" -> pushedGroupByStr)
   }
+
+  override def mergeWith(other: SupportsMerge, table: SupportsRead): Optional[SupportsMerge] = {
+    if (other.isInstanceOf[ParquetScan]) {
+      val o = other.asInstanceOf[ParquetScan]
+      if (fileIndex == o.fileIndex &&
+          options == o.options &&
+          dataSchema == o.dataSchema &&
+          equivalentFilters(pushedFilters, o.pushedFilters) &&
+          pushedDownAggEqual(o) &&
+          normalizedPartitionFilters == o.normalizedPartitionFilters &&
+          normalizedDataFilters == o.normalizedDataFilters) {
+        val builder = table.newScanBuilder(options).asInstanceOf[ParquetScanBuilder]

Review Comment:
   [question] should we add assertion for `table.newScanBuilder` should be a instance of ParquetScanBuilder ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala:
##########
@@ -138,4 +144,29 @@ case class ParquetScan(
       Map("PushedAggregation" -> pushedAggregationsStr) ++
       Map("PushedGroupBy" -> pushedGroupByStr)
   }
+
+  override def mergeWith(other: SupportsMerge, table: SupportsRead): Optional[SupportsMerge] = {
+    if (other.isInstanceOf[ParquetScan]) {

Review Comment:
   can replace this with case match 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala:
##########
@@ -279,6 +282,42 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
             }
           }
 
+        case (
+          DataSourceV2ScanRelation(newRelation, newScan: SupportsMerge, newOutput,
+            newKeyGroupedPartitioning, newOrdering),
+          DataSourceV2ScanRelation(cachedRelation, cachedScan: SupportsMerge, cachedOutput,
+            cachedKeyGroupedPartitioning, cachedOrdering)) =>
+          checkIdenticalPlans(newRelation, cachedRelation).flatMap { outputMap =>
+            val mappedNewKeyGroupedPartitioning =
+              newKeyGroupedPartitioning.map(_.map(mapAttributes(_, outputMap)))
+            if (mappedNewKeyGroupedPartitioning.map(_.map(_.canonicalized)) ==
+              cachedKeyGroupedPartitioning.map(_.map(_.canonicalized))) {
+              val mappedNewOrdering = newOrdering.map(_.map(mapAttributes(_, outputMap)))
+              if (mappedNewOrdering.map(_.map(_.canonicalized)) ==

Review Comment:
   [minor] can we simplify the if else structure here ? something like 
   
   ``` java
   if (isKeyGroupPartitioningSame && isOrderingSame) { 
    // merge scans and update cachedRelation 
   } else  { 
     None
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #37711: [SPARK-40259][SQL] Support Parquet DSv2 in subquery plan merge
URL: https://github.com/apache/spark/pull/37711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org