You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/29 09:08:49 UTC

spark git commit: [SPARK-9251][SQL] do not order by expressions which still need evaluation

Repository: spark
Updated Branches:
  refs/heads/master 15667a0af -> 708794e8a


[SPARK-9251][SQL] do not order by expressions which still need evaluation

as an offline discussion with rxin , it's weird to be computing stuff while doing sorting, we should only order by bound reference during execution.

Author: Wenchen Fan <cl...@outlook.com>

Closes #7593 from cloud-fan/sort and squashes the following commits:

7b1bef7 [Wenchen Fan] add test
daf206d [Wenchen Fan] add more comments
289bee0 [Wenchen Fan] do not order by expressions which still need evaluation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/708794e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/708794e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/708794e8

Branch: refs/heads/master
Commit: 708794e8aae2c66bd291bab4f12117c33b57840c
Parents: 15667a0
Author: Wenchen Fan <cl...@outlook.com>
Authored: Wed Jul 29 00:08:45 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jul 29 00:08:45 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 58 ++++++++++++++++++++
 .../spark/sql/catalyst/expressions/random.scala |  4 +-
 .../catalyst/plans/logical/basicOperators.scala | 13 +++--
 .../sql/catalyst/analysis/AnalysisSuite.scala   | 36 ++++++++++--
 .../scala/org/apache/spark/sql/TestData.scala   |  2 -
 5 files changed, 101 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a6ea0cc..265f3d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -79,6 +79,7 @@ class Analyzer(
       ExtractWindowExpressions ::
       GlobalAggregates ::
       UnresolvedHavingClauseAttributes ::
+      RemoveEvaluationFromSort ::
       HiveTypeCoercion.typeCoercionRules ++
       extendedResolutionRules : _*),
     Batch("Nondeterministic", Once,
@@ -947,6 +948,63 @@ class Analyzer(
         Project(p.output, newPlan.withNewChildren(newChild :: Nil))
     }
   }
+
+  /**
+   * Removes all still-need-evaluate ordering expressions from sort and use an inner project to
+   * materialize them, finally use a outer project to project them away to keep the result same.
+   * Then we can make sure we only sort by [[AttributeReference]]s.
+   *
+   * As an example,
+   * {{{
+   *   Sort('a, 'b + 1,
+   *     Relation('a, 'b))
+   * }}}
+   * will be turned into:
+   * {{{
+   *   Project('a, 'b,
+   *     Sort('a, '_sortCondition,
+   *       Project('a, 'b, ('b + 1).as("_sortCondition"),
+   *         Relation('a, 'b))))
+   * }}}
+   */
+  object RemoveEvaluationFromSort extends Rule[LogicalPlan] {
+    private def hasAlias(expr: Expression) = {
+      expr.find {
+        case a: Alias => true
+        case _ => false
+      }.isDefined
+    }
+
+    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      // The ordering expressions have no effect to the output schema of `Sort`,
+      // so `Alias`s in ordering expressions are unnecessary and we should remove them.
+      case s @ Sort(ordering, _, _) if ordering.exists(hasAlias) =>
+        val newOrdering = ordering.map(_.transformUp {
+          case Alias(child, _) => child
+        }.asInstanceOf[SortOrder])
+        s.copy(order = newOrdering)
+
+      case s @ Sort(ordering, global, child)
+        if s.expressions.forall(_.resolved) && s.childrenResolved && !s.hasNoEvaluation =>
+
+        val (ref, needEval) = ordering.partition(_.child.isInstanceOf[AttributeReference])
+
+        val namedExpr = needEval.map(_.child match {
+          case n: NamedExpression => n
+          case e => Alias(e, "_sortCondition")()
+        })
+
+        val newOrdering = ref ++ needEval.zip(namedExpr).map { case (order, ne) =>
+          order.copy(child = ne.toAttribute)
+        }
+
+        // Add still-need-evaluate ordering expressions into inner project and then project
+        // them away after the sort.
+        Project(child.output,
+          Sort(newOrdering, global,
+            Project(child.output ++ namedExpr, child)))
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 8f30519..62d3d20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -66,7 +66,7 @@ case class Rand(seed: Long) extends RDG {
     val rngTerm = ctx.freshName("rng")
     val className = classOf[XORShiftRandom].getName
     ctx.addMutableState(className, rngTerm,
-      s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
+      s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
     ev.isNull = "false"
     s"""
       final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextDouble();
@@ -89,7 +89,7 @@ case class Randn(seed: Long) extends RDG {
     val rngTerm = ctx.freshName("rng")
     val className = classOf[XORShiftRandom].getName
     ctx.addMutableState(className, rngTerm,
-      s"$rngTerm = new $className($seed + org.apache.spark.TaskContext.getPartitionId());")
+      s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
     ev.isNull = "false"
     s"""
       final ${ctx.javaType(dataType)} ${ev.primitive} = $rngTerm.nextGaussian();

http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index af68358..ad5af19 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -33,7 +33,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
       }.nonEmpty
     )
 
-    !expressions.exists(!_.resolved) && childrenResolved && !hasSpecialExpressions
+    expressions.forall(_.resolved) && childrenResolved && !hasSpecialExpressions
   }
 }
 
@@ -67,7 +67,7 @@ case class Generate(
     generator.resolved &&
       childrenResolved &&
       generator.elementTypes.length == generatorOutput.length &&
-      !generatorOutput.exists(!_.resolved)
+      generatorOutput.forall(_.resolved)
   }
 
   // we don't want the gOutput to be taken as part of the expressions
@@ -187,7 +187,7 @@ case class WithWindowDefinition(
 }
 
 /**
- * @param order  The ordering expressions
+ * @param order  The ordering expressions, should all be [[AttributeReference]]
  * @param global True means global sorting apply for entire data set,
  *               False means sorting only apply within the partition.
  * @param child  Child logical plan
@@ -197,6 +197,11 @@ case class Sort(
     global: Boolean,
     child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
+
+  def hasNoEvaluation: Boolean = order.forall(_.child.isInstanceOf[AttributeReference])
+
+  override lazy val resolved: Boolean =
+    expressions.forall(_.resolved) && childrenResolved && hasNoEvaluation
 }
 
 case class Aggregate(
@@ -211,7 +216,7 @@ case class Aggregate(
       }.nonEmpty
     )
 
-    !expressions.exists(!_.resolved) && childrenResolved && !hasWindowExpressions
+    expressions.forall(_.resolved) && childrenResolved && !hasWindowExpressions
   }
 
   override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 221b4e9..a86cefe 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -165,11 +165,39 @@ class AnalysisSuite extends AnalysisTest {
 
   test("pull out nondeterministic expressions from Sort") {
     val plan = Sort(Seq(SortOrder(Rand(33), Ascending)), false, testRelation)
-    val projected = Alias(Rand(33), "_nondeterministic")()
+    val analyzed = caseSensitiveAnalyzer.execute(plan)
+    analyzed.transform {
+      case s: Sort if s.expressions.exists(!_.deterministic) =>
+        fail("nondeterministic expressions are not allowed in Sort")
+    }
+  }
+
+  test("remove still-need-evaluate ordering expressions from sort") {
+    val a = testRelation2.output(0)
+    val b = testRelation2.output(1)
+
+    def makeOrder(e: Expression): SortOrder = SortOrder(e, Ascending)
+
+    val noEvalOrdering = makeOrder(a)
+    val noEvalOrderingWithAlias = makeOrder(Alias(Alias(b, "name1")(), "name2")())
+
+    val needEvalExpr = Coalesce(Seq(a, Literal("1")))
+    val needEvalExpr2 = Coalesce(Seq(a, b))
+    val needEvalOrdering = makeOrder(needEvalExpr)
+    val needEvalOrdering2 = makeOrder(needEvalExpr2)
+
+    val plan = Sort(
+      Seq(noEvalOrdering, noEvalOrderingWithAlias, needEvalOrdering, needEvalOrdering2),
+      false, testRelation2)
+
+    val evaluatedOrdering = makeOrder(AttributeReference("_sortCondition", StringType)())
+    val materializedExprs = Seq(needEvalExpr, needEvalExpr2).map(e => Alias(e, "_sortCondition")())
+
     val expected =
-      Project(testRelation.output,
-        Sort(Seq(SortOrder(projected.toAttribute, Ascending)), false,
-          Project(testRelation.output :+ projected, testRelation)))
+      Project(testRelation2.output,
+        Sort(Seq(makeOrder(a), makeOrder(b), evaluatedOrdering, evaluatedOrdering), false,
+          Project(testRelation2.output ++ materializedExprs, testRelation2)))
+
     checkAnalysis(plan, expected)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/708794e8/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 207d7a3..e340f54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql
 
-import java.sql.Timestamp
-
 import org.apache.spark.sql.test.TestSQLContext.implicits._
 import org.apache.spark.sql.test._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org