You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wangyum (via GitHub)" <gi...@apache.org> on 2023/02/05 11:52:40 UTC

[GitHub] [spark] wangyum commented on a diff in pull request #39887: [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge

wangyum commented on code in PR #39887:
URL: https://github.com/apache/spark/pull/39887#discussion_r1096671489


##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2598,4 +2598,29 @@ class SubquerySuite extends QueryTest
         Row("aa"))
     }
   }
+
+  test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
+    withTempView("t1") {
+      Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
+
+      checkAnswer(sql(
+        """
+          |SELECT
+          |  (SELECT count(distinct c1) FROM t1),
+          |  (SELECT count(distinct c2) FROM t1)
+          |""".stripMargin),
+        Row(2, 2))

Review Comment:
   It seems `MergeScalarSubqueries` can not improve this case:
   Spark 3.2:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Project [Subquery subquery#185, [id=#223] AS scalarsubquery()#198L, Subquery subquery#186, [id=#242] AS scalarsubquery()#199L]
      :  :- Subquery subquery#185, [id=#223]
      :  :  +- AdaptiveSparkPlan isFinalPlan=false
      :  :     +- HashAggregate(keys=[], functions=[count(distinct c1#153)], output=[count(DISTINCT c1)#193L])
      :  :        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=221]
      :  :           +- HashAggregate(keys=[], functions=[partial_count(distinct c1#153)], output=[count#202L])
      :  :              +- HashAggregate(keys=[c1#153], functions=[], output=[c1#153])
      :  :                 +- Exchange hashpartitioning(c1#153, 200), ENSURE_REQUIREMENTS, [plan_id=217]
      :  :                    +- HashAggregate(keys=[c1#153], functions=[], output=[c1#153])
      :  :                       +- FileScan parquet default.t1[c1#153] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/Downloads/spark-3.2.3-bin-hadoop2.7/spark-warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int>
      :  +- Subquery subquery#186, [id=#242]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[count(distinct c2#197)], output=[count(DISTINCT c2)#195L])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=240]
      :              +- HashAggregate(keys=[], functions=[partial_count(distinct c2#197)], output=[count#206L])
      :                 +- HashAggregate(keys=[c2#197], functions=[], output=[c2#197])
      :                    +- Exchange hashpartitioning(c2#197, 200), ENSURE_REQUIREMENTS, [plan_id=236]
      :                       +- HashAggregate(keys=[c2#197], functions=[], output=[c2#197])
      :                          +- FileScan parquet default.t1[c2#197] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/Downloads/spark-3.2.3-bin-hadoop2.7/spark-warehous..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c2:int>
      +- Scan OneRowRelation[]
   ```
   
   After `MergeScalarSubqueries` and this PR:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Project [Subquery subquery#0, [id=#36].count(DISTINCT c1) AS scalarsubquery()#11L, Subquery subquery#1, [id=#63].count(DISTINCT c2) AS scalarsubquery()#12L]
      :  :- Subquery subquery#0, [id=#36]
      :  :  +- AdaptiveSparkPlan isFinalPlan=false
      :  :     +- Project [named_struct(count(DISTINCT c1), count(DISTINCT c1)#5L, count(DISTINCT c2), count(DISTINCT c2)#8L) AS mergedValue#17]
      :  :        +- HashAggregate(keys=[], functions=[count(spark_catalog.default.t1.c1#19), count(c2#20)], output=[count(DISTINCT c1)#5L, count(DISTINCT c2)#8L])
      :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=33]
      :  :              +- HashAggregate(keys=[], functions=[partial_count(spark_catalog.default.t1.c1#19) FILTER (WHERE (gid#18 = 1)), partial_count(c2#20) FILTER (WHERE (gid#18 = 2))], output=[count#23L, count#24L])
      :  :                 +- HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :  :                    +- Exchange hashpartitioning(spark_catalog.default.t1.c1#19, c2#20, gid#18, 5), ENSURE_REQUIREMENTS, [plan_id=29]
      :  :                       +- HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :  :                          +- Expand [[c1#2, null, 1], [null, c2#3, 2]], [spark_catalog.default.t1.c1#19, c2#20, gid#18]
      :  :                             +- FileScan parquet spark_catalog.default.t1[c1#2,c2#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int,c2:int>
      :  +- Subquery subquery#1, [id=#63]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- Project [named_struct(count(DISTINCT c1), count(DISTINCT c1)#5L, count(DISTINCT c2), count(DISTINCT c2)#8L) AS mergedValue#17]
      :           +- HashAggregate(keys=[], functions=[count(spark_catalog.default.t1.c1#19), count(c2#20)], output=[count(DISTINCT c1)#5L, count(DISTINCT c2)#8L])
      :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=60]
      :                 +- HashAggregate(keys=[], functions=[partial_count(spark_catalog.default.t1.c1#19) FILTER (WHERE (gid#18 = 1)), partial_count(c2#20) FILTER (WHERE (gid#18 = 2))], output=[count#23L, count#24L])
      :                    +- HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :                       +- Exchange hashpartitioning(spark_catalog.default.t1.c1#19, c2#20, gid#18, 5), ENSURE_REQUIREMENTS, [plan_id=56]
      :                          +- HashAggregate(keys=[spark_catalog.default.t1.c1#19, c2#20, gid#18], functions=[], output=[spark_catalog.default.t1.c1#19, c2#20, gid#18])
      :                             +- Expand [[c1#2, null, 1], [null, c2#3, 2]], [spark_catalog.default.t1.c1#19, c2#20, gid#18]
      :                                +- FileScan parquet spark_catalog.default.t1[c1#2,c2#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:int,c2:int>
      +- Scan OneRowRelation[]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org