You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/21 07:39:42 UTC

spark git commit: [SPARK-24242][SQL] RangeExec should have correct outputOrdering and outputPartitioning

Repository: spark
Updated Branches:
  refs/heads/master f32b7faf7 -> 6d7d45a1a


[SPARK-24242][SQL] RangeExec should have correct outputOrdering and outputPartitioning

## What changes were proposed in this pull request?

Logical `Range` node has been added with `outputOrdering` recently. It's used to eliminate redundant `Sort` during optimization. However, this `outputOrdering` doesn't not propagate to physical `RangeExec` node.

We also add correct `outputPartitioning` to `RangeExec` node.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #21291 from viirya/SPARK-24242.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d7d45a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d7d45a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d7d45a1

Branch: refs/heads/master
Commit: 6d7d45a1af078edd9e4ed027e735d6096482179c
Parents: f32b7fa
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Mon May 21 15:39:35 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Mon May 21 15:39:35 2018 +0800

----------------------------------------------------------------------
 python/pyspark/sql/tests.py                     |  4 +--
 .../sql/execution/basicPhysicalOperators.scala  | 14 ++++++++++
 .../apache/spark/sql/ConfigBehaviorSuite.scala  |  4 ++-
 .../spark/sql/execution/PlannerSuite.scala      | 27 +++++++++++++++++++-
 .../sql/execution/WholeStageCodegenSuite.scala  |  4 +--
 .../sql/execution/debug/DebuggingSuite.scala    |  7 +++--
 6 files changed, 52 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a1b6db7..c7bd8f0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -5239,8 +5239,8 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         expected2 = df.groupby().agg(sum(df.v))
 
         # groupby one column and one sql expression
-        result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
-        expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+        result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
+        expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)).orderBy(df.id, df.v % 2)
 
         # groupby one python UDF
         result4 = df.groupby(plus_one(df.id)).agg(sum_udf(df.v))

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 1edfdc8..2df81d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -345,6 +345,20 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
 
   override val output: Seq[Attribute] = range.output
 
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+  override def outputPartitioning: Partitioning = {
+    if (numElements > 0) {
+      if (numSlices == 1) {
+        SinglePartition
+      } else {
+        RangePartitioning(outputOrdering, numSlices)
+      }
+    } else {
+      UnknownPartitioning(0)
+    }
+  }
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
index 949505e..276496b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {
     def computeChiSquareTest(): Double = {
       val n = 10000
       // Trigger a sort
-      val data = spark.range(0, n, 1, 1).sort('id.desc)
+      // Range has range partitioning in its output now. To have a range shuffle, we
+      // need to run a repartition first.
+      val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
         .selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()
 
       // Compute histogram for the number of records per partition post sort

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a375f88..b2aba8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{execution, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range, Repartition, Sort}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
@@ -633,6 +633,31 @@ class PlannerSuite extends SharedSQLContext {
       requiredOrdering = Seq(orderingA, orderingB),
       shouldHaveSort = true)
   }
+
+  test("SPARK-24242: RangeExec should have correct output ordering and partitioning") {
+    val df = spark.range(10)
+    val rangeExec = df.queryExecution.executedPlan.collect {
+      case r: RangeExec => r
+    }
+    val range = df.queryExecution.optimizedPlan.collect {
+      case r: Range => r
+    }
+    assert(rangeExec.head.outputOrdering == range.head.outputOrdering)
+    assert(rangeExec.head.outputPartitioning ==
+      RangePartitioning(rangeExec.head.outputOrdering, df.rdd.getNumPartitions))
+
+    val rangeInOnePartition = spark.range(1, 10, 1, 1)
+    val rangeExecInOnePartition = rangeInOnePartition.queryExecution.executedPlan.collect {
+      case r: RangeExec => r
+    }
+    assert(rangeExecInOnePartition.head.outputPartitioning == SinglePartition)
+
+    val rangeInZeroPartition = spark.range(-10, -9, -20, 1)
+    val rangeExecInZeroPartition = rangeInZeroPartition.queryExecution.executedPlan.collect {
+      case r: RangeExec => r
+    }
+    assert(rangeExecInZeroPartition.head.outputPartitioning == UnknownPartitioning(0))
+  }
 }
 
 // Used for unit-testing EnsureRequirements

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 9180a22..b714dcd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -51,12 +51,12 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
   }
 
   test("Aggregate with grouping keys should be included in WholeStageCodegen") {
-    val df = spark.range(3).groupBy("id").count().orderBy("id")
+    val df = spark.range(3).groupBy(col("id") * 2).count().orderBy(col("id") * 2)
     val plan = df.queryExecution.executedPlan
     assert(plan.find(p =>
       p.isInstanceOf[WholeStageCodegenExec] &&
         p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
-    assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1)))
+    assert(df.collect() === Array(Row(0, 1), Row(2, 1), Row(4, 1)))
   }
 
   test("BroadcastHashJoin should be included in WholeStageCodegen") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index adcaf2d..8251ff1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.debug
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData.TestData
 
@@ -33,14 +34,16 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("debugCodegen") {
-    val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
+    val res = codegenString(spark.range(10).groupBy(col("id") * 2).count()
+      .queryExecution.executedPlan)
     assert(res.contains("Subtree 1 / 2"))
     assert(res.contains("Subtree 2 / 2"))
     assert(res.contains("Object[]"))
   }
 
   test("debugCodegenStringSeq") {
-    val res = codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
+    val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
+      .queryExecution.executedPlan)
     assert(res.length == 2)
     assert(res.forall{ case (subtree, code) =>
       subtree.contains("Range") && code.contains("Object[]")})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org