You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/30 21:11:49 UTC

spark git commit: [Spark-4512] [SQL] Unresolved Attribute Exception in Sort By

Repository: spark
Updated Branches:
  refs/heads/master daac22130 -> 53f0a00b6


[Spark-4512] [SQL] Unresolved Attribute Exception in Sort By

It will cause exception while do query like:
SELECT key+key FROM src sort by value;

Author: Cheng Hao <ha...@intel.com>

Closes #3386 from chenghao-intel/sort and squashes the following commits:

38c78cc [Cheng Hao] revert the SortPartition in SparkStrategies
7e9dd15 [Cheng Hao] update the typo
fcd1d64 [Cheng Hao] rebase the latest master and update the SortBy unit test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53f0a00b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53f0a00b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53f0a00b

Branch: refs/heads/master
Commit: 53f0a00b6051fb6cb52a90f91ae01bcd77e332c5
Parents: daac221
Author: Cheng Hao <ha...@intel.com>
Authored: Tue Dec 30 12:11:44 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Dec 30 12:11:44 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala    |  4 ++--
 .../spark/sql/catalyst/analysis/Analyzer.scala   | 13 +++++++------
 .../apache/spark/sql/catalyst/dsl/package.scala  |  4 ++--
 .../catalyst/plans/logical/basicOperators.scala  | 11 ++++++++++-
 .../scala/org/apache/spark/sql/SchemaRDD.scala   |  5 ++---
 .../spark/sql/execution/SparkStrategies.scala    | 11 +++++------
 .../org/apache/spark/sql/DslQuerySuite.scala     | 19 ++++++++++++++-----
 .../scala/org/apache/spark/sql/TestData.scala    |  2 +-
 .../scala/org/apache/spark/sql/hive/HiveQl.scala |  8 ++++----
 .../sql/hive/execution/HiveComparisonTest.scala  |  2 +-
 .../spark/sql/hive/execution/SQLQuerySuite.scala |  7 +++++++
 11 files changed, 55 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 66860a4..f79d4ff 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -204,8 +204,8 @@ class SqlParser extends AbstractSparkSQLParser {
     )
 
   protected lazy val sortType: Parser[LogicalPlan => LogicalPlan] =
-    ( ORDER ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, l) }
-    | SORT ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => SortPartitions(o, l) }
+    ( ORDER ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, true, l) }
+    | SORT ~ BY  ~> ordering ^^ { case o => l: LogicalPlan => Sort(o, false, l) }
     )
 
   protected lazy val ordering: Parser[Seq[SortOrder]] =

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1c4088b..72680f3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -246,7 +246,7 @@ class Analyzer(catalog: Catalog,
       case p: LogicalPlan if !p.childrenResolved => p
 
       // If the projection list contains Stars, expand it.
-      case p@Project(projectList, child) if containsStar(projectList) =>
+      case p @ Project(projectList, child) if containsStar(projectList) =>
         Project(
           projectList.flatMap {
             case s: Star => s.expand(child.output, resolver)
@@ -310,7 +310,8 @@ class Analyzer(catalog: Catalog,
    */
   object ResolveSortReferences extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-      case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved =>
+      case s @ Sort(ordering, global, p @ Project(projectList, child))
+          if !s.resolved && p.resolved =>
         val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
         val resolved = unresolved.flatMap(child.resolve(_, resolver))
         val requiredAttributes = AttributeSet(resolved.collect { case a: Attribute => a })
@@ -319,13 +320,14 @@ class Analyzer(catalog: Catalog,
         if (missingInProject.nonEmpty) {
           // Add missing attributes and then project them away after the sort.
           Project(projectList.map(_.toAttribute),
-            Sort(ordering,
+            Sort(ordering, global,
               Project(projectList ++ missingInProject, child)))
         } else {
           logDebug(s"Failed to find $missingInProject in ${p.output.mkString(", ")}")
           s // Nothing we can do here. Return original plan.
         }
-      case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved =>
+      case s @ Sort(ordering, global, a @ Aggregate(grouping, aggs, child))
+          if !s.resolved && a.resolved =>
         val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
         // A small hack to create an object that will allow us to resolve any references that
         // refer to named expressions that are present in the grouping expressions.
@@ -340,8 +342,7 @@ class Analyzer(catalog: Catalog,
         if (missingInAggs.nonEmpty) {
           // Add missing grouping exprs and then project them away after the sort.
           Project(a.output,
-            Sort(ordering,
-              Aggregate(grouping, aggs ++ missingInAggs, child)))
+            Sort(ordering, global, Aggregate(grouping, aggs ++ missingInAggs, child)))
         } else {
           s // Nothing we can do here. Return original plan.
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index fb252cd..a14e5b9 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -244,9 +244,9 @@ package object dsl {
         condition: Option[Expression] = None) =
       Join(logicalPlan, otherPlan, joinType, condition)
 
-    def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, logicalPlan)
+    def orderBy(sortExprs: SortOrder*) = Sort(sortExprs, true, logicalPlan)
 
-    def sortBy(sortExprs: SortOrder*) = SortPartitions(sortExprs, logicalPlan)
+    def sortBy(sortExprs: SortOrder*) = Sort(sortExprs, false, logicalPlan)
 
     def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*) = {
       val aliasedExprs = aggregateExprs.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index a9282b9..0b9f01c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -130,7 +130,16 @@ case class WriteToFile(
   override def output = child.output
 }
 
-case class Sort(order: Seq[SortOrder], child: LogicalPlan) extends UnaryNode {
+/**
+ * @param order  The ordering expressions 
+ * @param global True means global sorting apply for entire data set, 
+ *               False means sorting only apply within the partition.
+ * @param child  Child logical plan              
+ */
+case class Sort(
+    order: Seq[SortOrder],
+    global: Boolean,
+    child: LogicalPlan) extends UnaryNode {
   override def output = child.output
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 856b10f..80787b6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -214,7 +214,7 @@ class SchemaRDD(
    * @group Query
    */
   def orderBy(sortExprs: SortOrder*): SchemaRDD =
-    new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan))
+    new SchemaRDD(sqlContext, Sort(sortExprs, true, logicalPlan))
 
   /**
    * Sorts the results by the given expressions within partition.
@@ -227,7 +227,7 @@ class SchemaRDD(
    * @group Query
    */
   def sortBy(sortExprs: SortOrder*): SchemaRDD =
-    new SchemaRDD(sqlContext, SortPartitions(sortExprs, logicalPlan))
+    new SchemaRDD(sqlContext, Sort(sortExprs, false, logicalPlan))
 
   @deprecated("use limit with integer argument", "1.1.0")
   def limit(limitExpr: Expression): SchemaRDD =
@@ -238,7 +238,6 @@ class SchemaRDD(
    * {{{
    *   schemaRDD.limit(10)
    * }}}
-   * 
    * @group Query
    */
   def limit(limitNum: Int): SchemaRDD =

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2954d4c..9151da6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -190,7 +190,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object TakeOrdered extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, child)) =>
+      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
         execution.TakeOrdered(limit, order, planLater(child)) :: Nil
       case _ => Nil
     }
@@ -257,15 +257,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.Distinct(partial = false,
           execution.Distinct(partial = true, planLater(child))) :: Nil
 
-      case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
-        execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
-      case logical.Sort(sortExprs, child) =>
-        execution.Sort(sortExprs, global = true, planLater(child)):: Nil
-
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
         execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
+      case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
+        execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
+      case logical.Sort(sortExprs, global, child) =>
+        execution.Sort(sortExprs, global, planLater(child)):: Nil
       case logical.Project(projectList, child) =>
         execution.Project(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 691c4b3..c0b9cf5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -88,7 +88,7 @@ class DslQuerySuite extends QueryTest {
       Seq(Seq(6)))
   }
 
-  test("sorting") {
+  test("global sorting") {
     checkAnswer(
       testData2.orderBy('a.asc, 'b.asc),
       Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
@@ -122,22 +122,31 @@ class DslQuerySuite extends QueryTest {
       mapData.collect().sortBy(_.data(1)).reverse.toSeq)
   }
 
-  test("sorting #2") {
+  test("partition wide sorting") {
+    // 2 partitions totally, and
+    // Partition #1 with values:
+    //    (1, 1)
+    //    (1, 2)
+    //    (2, 1)
+    // Partition #2 with values:
+    //    (2, 2)
+    //    (3, 1)
+    //    (3, 2)
     checkAnswer(
       testData2.sortBy('a.asc, 'b.asc),
       Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
 
     checkAnswer(
       testData2.sortBy('a.asc, 'b.desc),
-      Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
+      Seq((1,2), (1,1), (2,1), (2,2), (3,2), (3,1)))
 
     checkAnswer(
       testData2.sortBy('a.desc, 'b.desc),
-      Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
+      Seq((2,1), (1,2), (1,1), (3,2), (3,1), (2,2)))
 
     checkAnswer(
       testData2.sortBy('a.desc, 'b.asc),
-      Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
+      Seq((2,1), (1,1), (1,2), (3,1), (3,2), (2,2)))
   }
 
   test("limit") {

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index bb553a0..497897c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -55,7 +55,7 @@ object TestData {
       TestData2(2, 1) ::
       TestData2(2, 2) ::
       TestData2(3, 1) ::
-      TestData2(3, 2) :: Nil).toSchemaRDD
+      TestData2(3, 2) :: Nil, 2).toSchemaRDD
   testData2.registerTempTable("testData2")
 
   case class DecimalData(a: BigDecimal, b: BigDecimal)

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 3f3d9e7..8a9613c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -680,16 +680,16 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
         val withSort =
           (orderByClause, sortByClause, distributeByClause, clusterByClause) match {
             case (Some(totalOrdering), None, None, None) =>
-              Sort(totalOrdering.getChildren.map(nodeToSortOrder), withHaving)
+              Sort(totalOrdering.getChildren.map(nodeToSortOrder), true, withHaving)
             case (None, Some(perPartitionOrdering), None, None) =>
-              SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder), withHaving)
+              Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false, withHaving)
             case (None, None, Some(partitionExprs), None) =>
               Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving)
             case (None, Some(perPartitionOrdering), Some(partitionExprs), None) =>
-              SortPartitions(perPartitionOrdering.getChildren.map(nodeToSortOrder),
+              Sort(perPartitionOrdering.getChildren.map(nodeToSortOrder), false,
                 Repartition(partitionExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, Some(clusterExprs)) =>
-              SortPartitions(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)),
+              Sort(clusterExprs.getChildren.map(nodeToExpr).map(SortOrder(_, Ascending)), false,
                 Repartition(clusterExprs.getChildren.map(nodeToExpr), withHaving))
             case (None, None, None, None) => withHaving
             case _ => sys.error("Unsupported set of ordering / distribution clauses.")

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 8011f9b..4104df8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -132,7 +132,7 @@ abstract class HiveComparisonTest
 
     def isSorted(plan: LogicalPlan): Boolean = plan match {
       case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
-      case PhysicalOperation(_, _, Sort(_, _)) => true
+      case PhysicalOperation(_, _, Sort(_, true, _)) => true
       case _ => plan.children.iterator.exists(isSorted)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/53f0a00b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f57f31a..5d0fb72 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -32,6 +32,13 @@ case class Nested3(f3: Int)
  * valid, but Hive currently cannot execute it.
  */
 class SQLQuerySuite extends QueryTest {
+  test("SPARK-4512 Fix attribute reference resolution error when using SORT BY") {
+    checkAnswer(
+      sql("SELECT * FROM (SELECT key + key AS a FROM src SORT BY value) t ORDER BY t.a"),
+      sql("SELECT key + key as a FROM src ORDER BY a").collect().toSeq
+    )
+  }
+
   test("CTAS with serde") {
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect
     sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org