You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/12 13:06:26 UTC

[GitHub] [spark] maropu commented on a change in pull request #30300: [SPARK-33399][SQL] Normalize output partitioning and sortorder with respect to aliases to avoid unneeded exchange/sort nodes

maropu commented on a change in pull request #30300:
URL: https://github.com/apache/spark/pull/30300#discussion_r522091225



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
##########
@@ -895,6 +895,121 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
     }
   }
 
+  test("SPARK-33399: aliases should be handled properly in PartitioningCollection output" +
+    " partitioning") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withTempView("t1", "t2", "t3") {
+        spark.range(10).repartition($"id").createTempView("t1")
+        spark.range(20).repartition($"id").createTempView("t2")
+        spark.range(30).repartition($"id").createTempView("t3")
+        val planned = sql(
+          """
+            |SELECT t3.id as t3id
+            |FROM (
+            |    SELECT t1.id as t1id, t2.id as t2id
+            |    FROM t1, t2
+            |    WHERE t1.id = t2.id
+            |) t12, t3
+            |WHERE t1id = t3.id
+          """.stripMargin).queryExecution.executedPlan
+        val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+        assert(exchanges.size == 3)
+      }
+    }
+  }
+
+  test("SPARK-33399: aliases should be handled properly in HashPartitioning") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      withTempView("t1", "t2", "t3") {
+        spark.range(10).repartition($"id").createTempView("t1")
+        spark.range(20).repartition($"id").createTempView("t2")
+        spark.range(30).repartition($"id").createTempView("t3")
+        val planned = sql(
+          """
+            |SELECT t1id, t3.id as t3id
+            |FROM (
+            |    SELECT t1.id as t1id
+            |    FROM t1 LEFT SEMI JOIN t2
+            |    ON t1.id = t2.id
+            |) t12 INNER JOIN t3
+            |WHERE t1id = t3.id
+          """.stripMargin).queryExecution.executedPlan
+        val exchanges = planned.collect { case s: ShuffleExchangeExec => s }
+        assert(exchanges.size == 3)
+      }
+    }
+  }
+
+  test("SPARK-33399: alias handling should happen properly for RangePartitioning") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df = spark.range(1, 100)
+        .select(col("id").as("id1")).groupBy("id1").count()
+      // Plan for this will be Range -> ProjectWithAlias -> HashAggregate -> HashAggregate
+      // if Project normalizes alias in its Range outputPartitioning, then no Exchange should come
+      // in between HashAggregates
+      val planned = df.queryExecution.executedPlan

Review comment:
       Ah, last my nit comment: could you check the query use `RangePartitioning` correctly in this test (I have the same comment for the other tests, too)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org