You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/30 04:09:10 UTC
spark git commit: [SQL] Support df("*") to select all columns in a
data frame.
Repository: spark
Updated Branches:
refs/heads/master 22271f969 -> 80def9deb
[SQL] Support df("*") to select all columns in a data frame.
This PR makes Star a trait, and provides two implementations: UnresolvedStar (used for *, tblName.*) and ResolvedStar (used for df("*")).
Author: Reynold Xin <rx...@databricks.com>
Closes #4283 from rxin/df-star and squashes the following commits:
c9cba3e [Reynold Xin] Removed mapFunction in UnresolvedStar.
1a3a1d7 [Reynold Xin] [SQL] Support df("*") to select all columns in a data frame.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80def9de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80def9de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80def9de
Branch: refs/heads/master
Commit: 80def9deb3bfc30d5b622b32aecb0322341a7f62
Parents: 22271f9
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Jan 29 19:09:08 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jan 29 19:09:08 2015 -0800
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/SqlParser.scala | 2 +-
.../sql/catalyst/analysis/unresolved.scala | 53 +++++++++++++-------
.../sql/catalyst/analysis/AnalysisSuite.scala | 4 +-
.../scala/org/apache/spark/sql/Column.scala | 6 +--
.../scala/org/apache/spark/sql/DataFrame.scala | 4 +-
.../spark/sql/ColumnExpressionSuite.scala | 8 ++-
.../org/apache/spark/sql/hive/HiveQl.scala | 6 +--
7 files changed, 54 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index eaadbe9..24a65f8 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -348,7 +348,7 @@ class SqlParser extends AbstractSparkSQLParser {
)
protected lazy val baseExpression: Parser[Expression] =
- ( "*" ^^^ Star(None)
+ ( "*" ^^^ UnresolvedStar(None)
| primary
)
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 71a738a..6606028 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -50,7 +50,7 @@ case class UnresolvedAttribute(name: String) extends Attribute with trees.LeafNo
override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false
- override def newInstance = this
+ override def newInstance() = this
override def withNullability(newNullability: Boolean) = this
override def withQualifiers(newQualifiers: Seq[String]) = this
override def withName(newName: String) = UnresolvedAttribute(name)
@@ -77,15 +77,10 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E
/**
* Represents all of the input attributes to a given relational operator, for example in
- * "SELECT * FROM ...".
- *
- * @param table an optional table that should be the target of the expansion. If omitted all
- * tables' columns are produced.
+ * "SELECT * FROM ...". A [[Star]] gets automatically expanded during analysis.
*/
-case class Star(
- table: Option[String],
- mapFunction: Attribute => Expression = identity[Attribute])
- extends Attribute with trees.LeafNode[Expression] {
+trait Star extends Attribute with trees.LeafNode[Expression] {
+ self: Product =>
override def name = throw new UnresolvedException(this, "name")
override def exprId = throw new UnresolvedException(this, "exprId")
@@ -94,29 +89,53 @@ case class Star(
override def qualifiers = throw new UnresolvedException(this, "qualifiers")
override lazy val resolved = false
- override def newInstance = this
+ override def newInstance() = this
override def withNullability(newNullability: Boolean) = this
override def withQualifiers(newQualifiers: Seq[String]) = this
override def withName(newName: String) = this
- def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = {
+ // Star gets expanded at runtime so we never evaluate a Star.
+ override def eval(input: Row = null): EvaluatedType =
+ throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
+
+ def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression]
+}
+
+
+/**
+ * Represents all of the input attributes to a given relational operator, for example in
+ * "SELECT * FROM ...".
+ *
+ * @param table an optional table that should be the target of the expansion. If omitted all
+ * tables' columns are produced.
+ */
+case class UnresolvedStar(table: Option[String]) extends Star {
+
+ override def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = {
val expandedAttributes: Seq[Attribute] = table match {
// If there is no table specified, use all input attributes.
case None => input
// If there is a table, pick out attributes that are part of this table.
case Some(t) => input.filter(_.qualifiers.filter(resolver(_, t)).nonEmpty)
}
- val mappedAttributes = expandedAttributes.map(mapFunction).zip(input).map {
+ expandedAttributes.zip(input).map {
case (n: NamedExpression, _) => n
case (e, originalAttribute) =>
Alias(e, originalAttribute.name)(qualifiers = originalAttribute.qualifiers)
}
- mappedAttributes
}
- // Star gets expanded at runtime so we never evaluate a Star.
- override def eval(input: Row = null): EvaluatedType =
- throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")
-
override def toString = table.map(_ + ".").getOrElse("") + "*"
}
+
+
+/**
+ * Represents all the resolved input attributes to a given relational operator. This is used
+ * in the data frame DSL.
+ *
+ * @param expressions Expressions to expand.
+ */
+case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star {
+ override def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression] = expressions
+ override def toString = expressions.mkString("ResolvedStar(", ", ", ")")
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 3aea337..60060bf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -51,7 +51,9 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
test("union project *") {
val plan = (1 to 100)
.map(_ => testRelation)
- .fold[LogicalPlan](testRelation)((a,b) => a.select(Star(None)).select('a).unionAll(b.select(Star(None))))
+ .fold[LogicalPlan](testRelation) { (a, b) =>
+ a.select(UnresolvedStar(None)).select('a).unionAll(b.select(UnresolvedStar(None)))
+ }
assert(caseInsensitiveAnalyze(plan).resolved)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 68c9cb0..174c403 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.language.implicitConversions
import org.apache.spark.sql.Dsl.lit
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedStar, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Project, LogicalPlan}
import org.apache.spark.sql.types._
@@ -71,8 +71,8 @@ class Column(
* - "df.*" becomes an expression selecting all columns in data frame "df".
*/
def this(name: String) = this(name match {
- case "*" => Star(None)
- case _ if name.endsWith(".*") => Star(Some(name.substring(0, name.length - 2)))
+ case "*" => UnresolvedStar(None)
+ case _ if name.endsWith(".*") => UnresolvedStar(Some(name.substring(0, name.length - 2)))
case _ => UnresolvedAttribute(name)
})
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 2694e81..1096e39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -31,7 +31,7 @@ import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -265,7 +265,7 @@ class DataFrame protected[sql](
*/
override def apply(colName: String): Column = colName match {
case "*" =>
- Column("*")
+ new Column(ResolvedStar(schema.fieldNames.map(resolve)))
case _ =>
val expr = resolve(colName)
new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), expr)
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 6428554..2d464c2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -31,10 +31,14 @@ class ColumnExpressionSuite extends QueryTest {
checkAnswer(testData.select($"*"), testData.collect().toSeq)
}
- ignore("star qualified by data frame object") {
+ test("star qualified by data frame object") {
// This is not yet supported.
val df = testData.toDataFrame
- checkAnswer(df.select(df("*")), df.collect().toSeq)
+ val goldAnswer = df.collect().toSeq
+ checkAnswer(df.select(df("*")), goldAnswer)
+
+ val df1 = df.select(df("*"), lit("abcd").as("litCol"))
+ checkAnswer(df1.select(df("*")), goldAnswer)
}
test("star qualified by table name") {
http://git-wip-us.apache.org/repos/asf/spark/blob/80def9de/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 5e29e57..399e58b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -1002,11 +1002,11 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
}
/* Stars (*) */
- case Token("TOK_ALLCOLREF", Nil) => Star(None)
+ case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None)
// The format of dbName.tableName.* cannot be parsed by HiveParser. TOK_TABNAME will only
// has a single child which is tableName.
case Token("TOK_ALLCOLREF", Token("TOK_TABNAME", Token(name, Nil) :: Nil) :: Nil) =>
- Star(Some(name))
+ UnresolvedStar(Some(name))
/* Aggregate Functions */
case Token("TOK_FUNCTION", Token(AVG(), Nil) :: arg :: Nil) => Average(nodeToExpr(arg))
@@ -1145,7 +1145,7 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
case Token("TOK_FUNCTION", Token(name, Nil) :: args) =>
UnresolvedFunction(name, args.map(nodeToExpr))
case Token("TOK_FUNCTIONSTAR", Token(name, Nil) :: args) =>
- UnresolvedFunction(name, Star(None) :: Nil)
+ UnresolvedFunction(name, UnresolvedStar(None) :: Nil)
/* Literals */
case Token("TOK_NULL", Nil) => Literal(null, NullType)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org