You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/05/21 07:39:42 UTC
spark git commit: [SPARK-24242][SQL] RangeExec should have correct
outputOrdering and outputPartitioning
Repository: spark
Updated Branches:
refs/heads/master f32b7faf7 -> 6d7d45a1a
[SPARK-24242][SQL] RangeExec should have correct outputOrdering and outputPartitioning
## What changes were proposed in this pull request?
Logical `Range` node has been added with `outputOrdering` recently. It's used to eliminate redundant `Sort` during optimization. However, this `outputOrdering` doesn't not propagate to physical `RangeExec` node.
We also add correct `outputPartitioning` to `RangeExec` node.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #21291 from viirya/SPARK-24242.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d7d45a1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d7d45a1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d7d45a1
Branch: refs/heads/master
Commit: 6d7d45a1af078edd9e4ed027e735d6096482179c
Parents: f32b7fa
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Mon May 21 15:39:35 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Mon May 21 15:39:35 2018 +0800
----------------------------------------------------------------------
python/pyspark/sql/tests.py | 4 +--
.../sql/execution/basicPhysicalOperators.scala | 14 ++++++++++
.../apache/spark/sql/ConfigBehaviorSuite.scala | 4 ++-
.../spark/sql/execution/PlannerSuite.scala | 27 +++++++++++++++++++-
.../sql/execution/WholeStageCodegenSuite.scala | 4 +--
.../sql/execution/debug/DebuggingSuite.scala | 7 +++--
6 files changed, 52 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a1b6db7..c7bd8f0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -5239,8 +5239,8 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
expected2 = df.groupby().agg(sum(df.v))
# groupby one column and one sql expression
- result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v))
- expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v))
+ result3 = df.groupby(df.id, df.v % 2).agg(sum_udf(df.v)).orderBy(df.id, df.v % 2)
+ expected3 = df.groupby(df.id, df.v % 2).agg(sum(df.v)).orderBy(df.id, df.v % 2)
# groupby one python UDF
result4 = df.groupby(plus_one(df.id)).agg(sum_udf(df.v))
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 1edfdc8..2df81d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -345,6 +345,20 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
override val output: Seq[Attribute] = range.output
+ override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+ override def outputPartitioning: Partitioning = {
+ if (numElements > 0) {
+ if (numSlices == 1) {
+ SinglePartition
+ } else {
+ RangePartitioning(outputOrdering, numSlices)
+ }
+ } else {
+ UnknownPartitioning(0)
+ }
+ }
+
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
index 949505e..276496b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala
@@ -39,7 +39,9 @@ class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {
def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
- val data = spark.range(0, n, 1, 1).sort('id.desc)
+ // Range has range partitioning in its output now. To have a range shuffle, we
+ // need to run a repartition first.
+ val data = spark.range(0, n, 1, 1).repartition(10).sort('id.desc)
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()
// Compute histogram for the number of records per partition post sort
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a375f88..b2aba8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{execution, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Range, Repartition, Sort}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec}
@@ -633,6 +633,31 @@ class PlannerSuite extends SharedSQLContext {
requiredOrdering = Seq(orderingA, orderingB),
shouldHaveSort = true)
}
+
+ test("SPARK-24242: RangeExec should have correct output ordering and partitioning") {
+ val df = spark.range(10)
+ val rangeExec = df.queryExecution.executedPlan.collect {
+ case r: RangeExec => r
+ }
+ val range = df.queryExecution.optimizedPlan.collect {
+ case r: Range => r
+ }
+ assert(rangeExec.head.outputOrdering == range.head.outputOrdering)
+ assert(rangeExec.head.outputPartitioning ==
+ RangePartitioning(rangeExec.head.outputOrdering, df.rdd.getNumPartitions))
+
+ val rangeInOnePartition = spark.range(1, 10, 1, 1)
+ val rangeExecInOnePartition = rangeInOnePartition.queryExecution.executedPlan.collect {
+ case r: RangeExec => r
+ }
+ assert(rangeExecInOnePartition.head.outputPartitioning == SinglePartition)
+
+ val rangeInZeroPartition = spark.range(-10, -9, -20, 1)
+ val rangeExecInZeroPartition = rangeInZeroPartition.queryExecution.executedPlan.collect {
+ case r: RangeExec => r
+ }
+ assert(rangeExecInZeroPartition.head.outputPartitioning == UnknownPartitioning(0))
+ }
}
// Used for unit-testing EnsureRequirements
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 9180a22..b714dcd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -51,12 +51,12 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
}
test("Aggregate with grouping keys should be included in WholeStageCodegen") {
- val df = spark.range(3).groupBy("id").count().orderBy("id")
+ val df = spark.range(3).groupBy(col("id") * 2).count().orderBy(col("id") * 2)
val plan = df.queryExecution.executedPlan
assert(plan.find(p =>
p.isInstanceOf[WholeStageCodegenExec] &&
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[HashAggregateExec]).isDefined)
- assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1)))
+ assert(df.collect() === Array(Row(0, 1), Row(2, 1), Row(4, 1)))
}
test("BroadcastHashJoin should be included in WholeStageCodegen") {
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7d45a1/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index adcaf2d..8251ff1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.debug
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData.TestData
@@ -33,14 +34,16 @@ class DebuggingSuite extends SparkFunSuite with SharedSQLContext {
}
test("debugCodegen") {
- val res = codegenString(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
+ val res = codegenString(spark.range(10).groupBy(col("id") * 2).count()
+ .queryExecution.executedPlan)
assert(res.contains("Subtree 1 / 2"))
assert(res.contains("Subtree 2 / 2"))
assert(res.contains("Object[]"))
}
test("debugCodegenStringSeq") {
- val res = codegenStringSeq(spark.range(10).groupBy("id").count().queryExecution.executedPlan)
+ val res = codegenStringSeq(spark.range(10).groupBy(col("id") * 2).count()
+ .queryExecution.executedPlan)
assert(res.length == 2)
assert(res.forall{ case (subtree, code) =>
subtree.contains("Range") && code.contains("Object[]")})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org