You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/21 03:05:28 UTC
spark git commit: [SPARK-13310] [SQL] Resolve Missing Sorting Columns
in Generate
Repository: spark
Updated Branches:
refs/heads/master a4a081d1d -> f88c641bc
[SPARK-13310] [SQL] Resolve Missing Sorting Columns in Generate
```scala
// case 1: missing sort columns are resolvable if join is true
sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c")
// case 2: missing sort columns are not resolvable if join is false. Thus, issue an error message in this case
sql("SELECT explode(a) AS val FROM data order by val, c")
```
When sort columns are not in `Generate`, we can resolve them when `join` is equal to `true`. Still trying to add more test cases for the other `UnaryNode` types.
Could you review the changes? davies cloud-fan Thanks!
Author: gatorsmile <ga...@gmail.com>
Closes #11198 from gatorsmile/missingInSort.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f88c641b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f88c641b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f88c641b
Branch: refs/heads/master
Commit: f88c641bc8afa836ae597801ae7520da90e8472a
Parents: a4a081d
Author: gatorsmile <ga...@gmail.com>
Authored: Sat Feb 20 13:53:23 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Sat Feb 20 13:53:23 2016 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 34 +++++++++++++-------
.../catalyst/analysis/AnalysisErrorSuite.scala | 5 +++
.../sql/hive/execution/SQLQuerySuite.scala | 32 ++++++++++++++++--
3 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f88c641b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8368657..54b91ad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -609,17 +609,24 @@ class Analyzer(
case sa @ Sort(_, _, child: Aggregate) => sa
case s @ Sort(order, _, child) if !s.resolved && child.resolved =>
- val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
- val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
- val missingAttrs = requiredAttrs -- child.outputSet
- if (missingAttrs.nonEmpty) {
- // Add missing attributes and then project them away after the sort.
- Project(child.output,
- Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
- } else if (newOrder != order) {
- s.copy(order = newOrder)
- } else {
- s
+ try {
+ val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder])
+ val requiredAttrs = AttributeSet(newOrder).filter(_.resolved)
+ val missingAttrs = requiredAttrs -- child.outputSet
+ if (missingAttrs.nonEmpty) {
+ // Add missing attributes and then project them away after the sort.
+ Project(child.output,
+ Sort(newOrder, s.global, addMissingAttr(child, missingAttrs)))
+ } else if (newOrder != order) {
+ s.copy(order = newOrder)
+ } else {
+ s
+ }
+ } catch {
+ // Attempting to resolve it might fail. When this happens, return the original plan.
+ // Users will see an AnalysisException for resolution failure of missing attributes
+ // in Sort
+ case ae: AnalysisException => s
}
}
@@ -649,6 +656,11 @@ class Analyzer(
}
val newAggregateExpressions = a.aggregateExpressions ++ missingAttrs
a.copy(aggregateExpressions = newAggregateExpressions)
+ case g: Generate =>
+ // If join is false, we will convert it to true for getting from the child the missing
+ // attributes that its child might have or could have.
+ val missing = missingAttrs -- g.child.outputSet
+ g.copy(join = true, child = addMissingAttr(g.child, missing))
case u: UnaryNode =>
u.withNewChildren(addMissingAttr(u.child, missingAttrs) :: Nil)
case other =>
http://git-wip-us.apache.org/repos/asf/spark/blob/f88c641b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index e0cec09..a46006a 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -194,6 +194,11 @@ class AnalysisErrorSuite extends AnalysisTest {
"sort" :: "type" :: "map<int,int>" :: Nil)
errorTest(
+ "sorting by attributes are not from grouping expressions",
+ testRelation2.groupBy('a, 'c)('a, 'c, count('a).as("a3")).orderBy('b.asc),
+ "cannot resolve" :: "'b'" :: "given input columns" :: "[a, c, a3]" :: Nil)
+
+ errorTest(
"non-boolean filters",
testRelation.where(Literal(1)),
"filter" :: "'1'" :: "not a boolean" :: Literal(1).dataType.simpleString :: Nil)
http://git-wip-us.apache.org/repos/asf/spark/blob/f88c641b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c2b0daa..75fa09d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,11 +24,10 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry}
-import org.apache.spark.sql.catalyst.parser.ParserConf
-import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
-import org.apache.spark.sql.hive.{HiveContext, HiveQl, MetastoreRelation}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
@@ -927,6 +926,33 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
).map(i => Row(i._1, i._2, i._3, i._4)))
}
+ test("Sorting columns are not in Generate") {
+ withTempTable("data") {
+ sqlContext.range(1, 5)
+ .select(array($"id", $"id" + 1).as("a"), $"id".as("b"), (lit(10) - $"id").as("c"))
+ .registerTempTable("data")
+
+ // case 1: missing sort columns are resolvable if join is true
+ checkAnswer(
+ sql("SELECT explode(a) AS val, b FROM data WHERE b < 2 order by val, c"),
+ Row(1, 1) :: Row(2, 1) :: Nil)
+
+ // case 2: missing sort columns are resolvable if join is false
+ checkAnswer(
+ sql("SELECT explode(a) AS val FROM data order by val, c"),
+ Seq(1, 2, 2, 3, 3, 4, 4, 5).map(i => Row(i)))
+
+ // case 3: missing sort columns are resolvable if join is true and outer is true
+ checkAnswer(
+ sql(
+ """
+ |SELECT C.val, b FROM data LATERAL VIEW OUTER explode(a) C as val
+ |where b < 2 order by c, val, b
+ """.stripMargin),
+ Row(1, 1) :: Row(2, 1) :: Nil)
+ }
+ }
+
test("window function: Sorting columns are not in Project") {
val data = Seq(
WindowData(1, "d", 10),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org