You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/21 03:05:28 UTC

spark git commit: [SPARK-13310] [SQL] Resolve Missing Sorting Columns in Generate

Repository: spark
Updated Branches:
  refs/heads/master a4a081d1d -> f88c641bc


[SPARK-13310] [SQL] Resolve Missing Sorting Columns in Generate

```scala
// case 1: missing sort columns are resolvable if join is true
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c")
// case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case
sql("SELECT explode(a) AS val FROM data order by val, c")
```

When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types.

Could you review the changes? davies cloud-fan Thanks!

Author: gatorsmile <ga...@gmail.com>

Closes #11198 from gatorsmile/missingInSort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f88c641b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f88c641b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f88c641b

Branch: refs/heads/master
Commit: f88c641bc8afa836ae597801ae7520da90e8472a
Parents: a4a081d
Author: gatorsmile <ga...@gmail.com>
Authored: Sat Feb 20 13:53:23 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Feb 20 13:53:23 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 34 +++++++++++++-------
 .../catalyst/analysis/AnalysisErrorSuite.scala  |  5 +++
 .../sql/hive/execution/SQLQuerySuite.scala      | 32 ++++++++++++++++--
 3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f88c641b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8368657..54b91ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -609,17 +609,24 @@ class Analyzer(
       case sa @ Sort(_, _, child: Aggregate) => sa
 
       case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
-        val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
-        val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
-        val missingAttrs = requiredAttrs -- child.outputSet
-        if (missingAttrs.nonEmpty) {
-          // Add missing attributes and then project them away after the sort.
-          Project(child.output,
-            Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
-        } else if (newOrder != order) {
-          s.copy(order = newOrder)
-        } else {
-          s
+        try {
+          val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
+          val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
+          val missingAttrs = requiredAttrs -- child.outputSet
+          if (missingAttrs.nonEmpty) {
+            // Add missing attributes and then project them away after the sort.
+            Project(child.output,
+              Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
+          } else if (newOrder != order) {
+            s.copy(order = newOrder)
+          } else {
+            s
+          }
+        } catch {
+          // Attempting to resolve it might fail. When this happens, return the original plan.
+          // Users will see an AnalysisException for resolution failure of missing attributes
+          // in Sort
+          case ae: AnalysisException => s
         }
     }
 
@@ -649,6 +656,11 @@ class Analyzer(
           }
           val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs
           a.copy(aggregateExpressions = newAggregateExpressions)
+        case g: Generate =>
+          // If join is false, we will convert it to true for getting from the child the missing
+          // attributes that its child might have or could have.
+          val missing = missingAttrs -- g.child.outputSet
+          g.copy(join = true, child = addMissingAttr(g.child, missing))
         case u: UnaryNode =>
           u.withNewChildren(addMissingAttr(u.child, missingAttrs) :: Nil)
         case other =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f88c641b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index e0cec09..a46006a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -194,6 +194,11 @@ class AnalysisErrorSuite extends AnalysisTest {
     "sort" :: "type" :: "map<int,int>" :: Nil)
 
   errorTest(
+    "sorting by attributes are not from grouping expressions",
+    testRelation2.groupBy('a, 'c)('a, 'c, count('a).as("a3")).orderBy('b.asc),
+    "cannot resolve" :: "'b'" :: "given input columns" :: "[a, c, a3]" :: Nil)
+
+  errorTest(
     "non-boolean filters",
     testRelation.where(Literal(1)),
     "filter" :: "'1'" :: "not a boolean" :: Literal(1).dataType.simpleString :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/f88c641b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c2b0daa..75fa09d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,11 +24,10 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry}
-import org.apache.spark.sql.catalyst.parser.ParserConf
-import org.apache.spark.sql.execution.SparkQl
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
-import org.apache.spark.sql.hive.{HiveContext, HiveQl, MetastoreRelation}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
@@ -927,6 +926,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       ).map(i => Row(i._1, i._2, i._3, i._4)))
   }
 
+  test("Sorting columns are not in Generate") {
+    withTempTable("data") {
+      sqlContext.range(1, 5)
+        .select(array($"id", $"id" + 1).as("a"), $"id".as("b"), (lit(10) - $"id").as("c"))
+        .registerTempTable("data")
+
+      // case 1: missing sort columns are resolvable if join is true
+      checkAnswer(
+        sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c"),
+        Row(1, 1) :: Row(2, 1) :: Nil)
+
+      // case 2: missing sort columns are resolvable if join is false
+      checkAnswer(
+        sql("SELECT explode(a) AS val FROM data order by val, c"),
+        Seq(1, 2, 2, 3, 3, 4, 4, 5).map(i => Row(i)))
+
+      // case 3: missing sort columns are resolvable if join is true and outer is true
+      checkAnswer(
+        sql(
+          """
+            |SELECT C.val, b FROM data LATERAL VIEW OUTER explode(a) C as val
+            |where b < 2 order by c, val, b
+          """.stripMargin),
+        Row(1, 1) :: Row(2, 1) :: Nil)
+    }
+  }
+
   test("window function: Sorting columns are not in Project") {
     val data = Seq(
       WindowData(1, "d", 10),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org