You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/29 09:08:49 UTC
spark git commit: [SPARK-9251][SQL] do not order by expressions which
still need evaluation
Repository: spark
Updated Branches:
refs/heads/master 15667a0af -> 708794e8a
[SPARK-9251][SQL] do not order by expressions which still need evaluation
as an offline discussion with rxin , it's weird to be computing stuff while doing sorting, we should only order by bound reference during execution.
Author: Wenchen Fan <cl...@outlook.com>
Closes #7593 from cloud-fan/sort and squashes the following commits:
7b1bef7 [Wenchen Fan] add test
daf206d [Wenchen Fan] add more comments
289bee0 [Wenchen Fan] do not order by expressions which still need evaluation
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/708794e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/708794e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/708794e8
Branch: refs/heads/master
Commit: 708794e8aae2c66bd291bab4f12117c33b57840c
Parents: 15667a0
Author: Wenchen Fan <cl...@outlook.com>
Authored: Wed Jul 29 00:08:45 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jul 29 00:08:45 2015 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 58 ++++++++++++++++++++
.../spark/sql/catalyst/expressions/random.scala | 4 +-
.../catalyst/plans/logical/basicOperators.scala | 13 +++--
.../sql/catalyst/analysis/AnalysisSuite.scala | 36 ++++++++++--
.../scala/org/apache/spark/sql/TestData.scala | 2 -
5 files changed, 101 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a6ea0cc..265f3d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -79,6 +79,7 @@ class Analyzer(
ExtractWindowExpressions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
+ RemoveEvaluationFromSort ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Nondeterministic", Once,
@@ -947,6 +948,63 @@ class Analyzer(
Project(p.output, newPlan.withNewChildren(newChild :: Nil))
}
}
+
+ /**
+ * Removes all still-need-evaluate ordering expressions from sort and use an inner project to
+ * materialize them, finally use a outer project to project them away to keep the result same.
+ * Then we can make sure we only sort by [[AttributeReference]]s.
+ *
+ * As an example,
+ * {{{
+ * Sort('a, 'b + 1,
+ * Relation('a, 'b))
+ * }}}
+ * will be turned into:
+ * {{{
+ * Project('a, 'b,
+ * Sort('a, '_sortCondition,
+ * Project('a, 'b, ('b + 1).as("_sortCondition"),
+ * Relation('a, 'b))))
+ * }}}
+ */
+ object RemoveEvaluationFromSort extends Rule[LogicalPlan] {
+ private def hasAlias(expr: Expression) = {
+ expr.find {
+ case a: Alias => true
+ case _ => false
+ }.isDefined
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ // The ordering expressions have no effect to the output schema of `Sort`,
+ // so `Alias`s in ordering expressions are unnecessary and we should remove them.
+ case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) =>
+ val newOrdering = ordering.map(_.transformUp {
+ case Alias(child, _) => child
+ }.asInstanceOf[SortOrder])
+ s.copy(order = newOrdering)
+
+ case s @ Sort(ordering, global, child)
+ if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation =>
+
+ val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference])
+
+ val namedExpr = needEval.map(_.child match {
+ case n: NamedExpression => n
+ case e => Alias(e, "_sortCondition")()
+ })
+
+ val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>
+ order.copy(child = ne.toAttribute)
+ }
+
+ // Add still-need-evaluate ordering expressions into inner project and then project
+ // them away after the sort.
+ Project(child.output,
+ Sort(newOrdering, global,
+ Project(child.output ++ namedExpr, child)))
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 8f30519..62d3d20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -66,7 +66,7 @@ case class Rand(seed: Long) extends RDG {
val rngTerm = ctx.freshName("rng")
val className = classOf[XORShiftRandom].getName
ctx.addMutableState(className, rngTerm,
- s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
+ s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextDouble();
@@ -89,7 +89,7 @@ case class Randn(seed: Long) extends RDG {
val rngTerm = ctx.freshName("rng")
val className = classOf[XORShiftRandom].getName
ctx.addMutableState(className, rngTerm,
- s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
+ s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
ev.isNull = "false"
s"""
final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextGaussian();
http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index af68358..ad5af19 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -33,7 +33,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
}.nonEmpty
)
- !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
+ expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions
}
}
@@ -67,7 +67,7 @@ case class Generate(
generator.resolved &&
childrenResolved &&
generator.elementTypes.length == generatorOutput.length &&
- !generatorOutput.exists(!_.resolved)
+ generatorOutput.forall(_.resolved)
}
// we don't want the gOutput to be taken as part of the expressions
@@ -187,7 +187,7 @@ case class WithWindowDefinition(
}
/**
- * @param order The ordering expressions
+ * @param order The ordering expressions, should all be [[AttributeReference]]
* @param global True means global sorting apply for entire data set,
* False means sorting only apply within the partition.
* @param child Child logical plan
@@ -197,6 +197,11 @@ case class Sort(
global: Boolean,
child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
+
+ def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference])
+
+ override lazy val resolved: Boolean =
+ expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation
}
case class Aggregate(
@@ -211,7 +216,7 @@ case class Aggregate(
}.nonEmpty
)
- !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
+ expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
}
override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 221b4e9..a86cefe 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -165,11 +165,39 @@ class AnalysisSuite extends AnalysisTest {
test("pull out nondeterministic expressions from Sort") {
val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
- val projected = Alias(Rand(33), "_nondeterministic")()
+ val analyzed = caseSensitiveAnalyzer.execute(plan)
+ analyzed.transform {
+ case s: Sort if s.expressions.exists(!_.deterministic) =>
+ fail("nondeterministic expressions are not allowed in Sort")
+ }
+ }
+
+ test("remove still-need-evaluate ordering expressions from sort") {
+ val a = testRelation2.output(0)
+ val b = testRelation2.output(1)
+
+ def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending)
+
+ val noEvalOrdering = makeOrder(a)
+ val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")())
+
+ val needEvalExpr = Coalesce(Seq(a, Literal("1")))
+ val needEvalExpr2 = Coalesce(Seq(a, b))
+ val needEvalOrdering = makeOrder(needEvalExpr)
+ val needEvalOrdering2 = makeOrder(needEvalExpr2)
+
+ val plan = Sort(
+ Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2),
+ false, testRelation2)
+
+ val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)())
+ val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")())
+
val expected =
- Project(testRelation.output,
- Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
- Project(testRelation.output :+ projected, testRelation)))
+ Project(testRelation2.output,
+ Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false,
+ Project(testRelation2.output ++ materializedExprs, testRelation2)))
+
checkAnalysis(plan, expected)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 207d7a3..e340f54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import java.sql.Timestamp
-
import org.apache.spark.sql.test.TestSQLContext.implicits._
import org.apache.spark.sql.test._
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org