You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/05 02:23:23 UTC
git commit: [SQL] Minor fixes.
Repository: spark
Updated Branches:
refs/heads/master 198892fe8 -> d956cc251
[SQL] Minor fixes.
Author: Michael Armbrust <mi...@databricks.com>
Closes #315 from marmbrus/minorFixes and squashes the following commits:
b23a15d [Michael Armbrust] fix scaladoc
11062ac [Michael Armbrust] Fix registering "SELECT *" queries as tables and caching them. As some tests for this and self-joins.
3997dc9 [Michael Armbrust] Move Row extractor to catalyst.
208bf5e [Michael Armbrust] More idiomatic naming of DSL functions. * subquery => as * for join condition => on, i.e., `r.join(s, condition = 'a == 'b)` =>`r.join(s, on = 'a == 'b)`
87211ce [Michael Armbrust] Correctly handle self joins of in-memory cached tables.
69e195e [Michael Armbrust] Change != to !== in the DSL since != will always translate to != on Any.
01f2dd5 [Michael Armbrust] Correctly assign aliases to tables in SqlParser.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d956cc25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d956cc25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d956cc25
Branch: refs/heads/master
Commit: d956cc251676d67d87bd6dbfa82be864933d8136
Parents: 198892f
Author: Michael Armbrust <mi...@databricks.com>
Authored: Fri Apr 4 17:23:17 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Fri Apr 4 17:23:17 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/catalyst/SqlParser.scala | 2 +-
.../org/apache/spark/sql/catalyst/dsl/package.scala | 2 +-
.../apache/spark/sql/catalyst/expressions/Row.scala | 15 +++++++++++++++
.../sql/catalyst/plans/logical/basicOperators.scala | 1 +
.../main/scala/org/apache/spark/sql/package.scala | 15 +--------------
.../main/scala/org/apache/spark/sql/SchemaRDD.scala | 16 ++++++++--------
.../org/apache/spark/sql/execution/SparkPlan.scala | 3 +++
.../org/apache/spark/sql/CachedTableSuite.scala | 13 +++++++++++++
.../scala/org/apache/spark/sql/DslQuerySuite.scala | 16 ++++++++--------
.../spark/sql/parquet/ParquetQuerySuite.scala | 4 ++--
10 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 4ea80fe..5b6aea8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -219,7 +219,7 @@ class SqlParser extends StandardTokenParsers {
protected lazy val relationFactor: Parser[LogicalPlan] =
ident ~ (opt(AS) ~> opt(ident)) ^^ {
- case ident ~ alias => UnresolvedRelation(alias, ident)
+ case tableName ~ alias => UnresolvedRelation(None, tableName, alias)
} |
"(" ~> query ~ ")" ~ opt(AS) ~ ident ^^ { case s ~ _ ~ _ ~ a => Subquery(a, s) }
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 2c4bf17..2d62e4c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -70,7 +70,7 @@ package object dsl {
def > (other: Expression) = GreaterThan(expr, other)
def >= (other: Expression) = GreaterThanOrEqual(expr, other)
def === (other: Expression) = Equals(expr, other)
- def != (other: Expression) = Not(Equals(expr, other))
+ def !== (other: Expression) = Not(Equals(expr, other))
def like(other: Expression) = Like(expr, other)
def rlike(other: Expression) = RLike(expr, other)
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 6f939e6..9f4d844 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -19,6 +19,21 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.types.NativeType
+object Row {
+ /**
+ * This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val pairs = sql("SELECT key, value FROM src").rdd.map {
+ * case Row(key: Int, value: String) =>
+ * key -> value
+ * }
+ * }}}
+ */
+ def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
+}
+
/**
* Represents one row of output from a relational operator. Allows both generic access by ordinal,
* which will incur boxing overhead for primitives, as well as native primitive access.
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index b39c2b3..cfc0b0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -162,6 +162,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
a.nullable)(
a.exprId,
a.qualifiers)
+ case other => other
}
def references = Set.empty
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
index 9ec3168..4589129 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
@@ -32,18 +32,5 @@ package object sql {
type Row = catalyst.expressions.Row
- object Row {
- /**
- * This method can be used to extract fields from a [[Row]] object in a pattern match. Example:
- * {{{
- * import org.apache.spark.sql._
- *
- * val pairs = sql("SELECT key, value FROM src").rdd.map {
- * case Row(key: Int, value: String) =>
- * key -> value
- * }
- * }}}
- */
- def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
- }
+ val Row = catalyst.expressions.Row
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index a62cb8a..fc95781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -148,17 +148,17 @@ class SchemaRDD(
*
* @param otherPlan the [[SchemaRDD]] that should be joined with this one.
* @param joinType One of `Inner`, `LeftOuter`, `RightOuter`, or `FullOuter`. Defaults to `Inner.`
- * @param condition An optional condition for the join operation. This is equivilent to the `ON`
- * clause in standard SQL. In the case of `Inner` joins, specifying a
- * `condition` is equivilent to adding `where` clauses after the `join`.
+ * @param on An optional condition for the join operation. This is equivilent to the `ON`
+ * clause in standard SQL. In the case of `Inner` joins, specifying a
+ * `condition` is equivilent to adding `where` clauses after the `join`.
*
* @group Query
*/
def join(
otherPlan: SchemaRDD,
joinType: JoinType = Inner,
- condition: Option[Expression] = None): SchemaRDD =
- new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, condition))
+ on: Option[Expression] = None): SchemaRDD =
+ new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, on))
/**
* Sorts the results by the given expressions.
@@ -195,14 +195,14 @@ class SchemaRDD(
* with the same name, for example, when peforming self-joins.
*
* {{{
- * val x = schemaRDD.where('a === 1).subquery('x)
- * val y = schemaRDD.where('a === 2).subquery('y)
+ * val x = schemaRDD.where('a === 1).as('x)
+ * val y = schemaRDD.where('a === 2).as('y)
* x.join(y).where("x.a".attr === "y.a".attr),
* }}}
*
* @group Query
*/
- def subquery(alias: Symbol) =
+ def as(alias: Symbol) =
new SchemaRDD(sqlContext, Subquery(alias.name, logicalPlan))
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index acb1ee8..daa423c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.{QueryPlan, logical}
import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
self: Product =>
@@ -69,6 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
+ case InMemoryColumnarTableScan(output, child) =>
+ InMemoryColumnarTableScan(output.map(_.newInstance), child)
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index e5902c3..7c6a642 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -58,4 +58,17 @@ class CachedTableSuite extends QueryTest {
TestSQLContext.uncacheTable("testData")
}
}
+
+ test("SELECT Star Cached Table") {
+ TestSQLContext.sql("SELECT * FROM testData").registerAsTable("selectStar")
+ TestSQLContext.cacheTable("selectStar")
+ TestSQLContext.sql("SELECT * FROM selectStar")
+ TestSQLContext.uncacheTable("selectStar")
+ }
+
+ test("Self-join cached") {
+ TestSQLContext.cacheTable("testData")
+ TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
+ TestSQLContext.uncacheTable("testData")
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
index 2524a37..be0f4a4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -119,8 +119,8 @@ class DslQuerySuite extends QueryTest {
}
test("inner join, where, multiple matches") {
- val x = testData2.where('a === 1).subquery('x)
- val y = testData2.where('a === 1).subquery('y)
+ val x = testData2.where('a === 1).as('x)
+ val y = testData2.where('a === 1).as('y)
checkAnswer(
x.join(y).where("x.a".attr === "y.a".attr),
(1,1,1,1) ::
@@ -131,8 +131,8 @@ class DslQuerySuite extends QueryTest {
}
test("inner join, no matches") {
- val x = testData2.where('a === 1).subquery('x)
- val y = testData2.where('a === 2).subquery('y)
+ val x = testData2.where('a === 1).as('x)
+ val y = testData2.where('a === 2).as('y)
checkAnswer(
x.join(y).where("x.a".attr === "y.a".attr),
Nil)
@@ -140,8 +140,8 @@ class DslQuerySuite extends QueryTest {
test("big inner join, 4 matches per row") {
val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData)
- val bigDataX = bigData.subquery('x)
- val bigDataY = bigData.subquery('y)
+ val bigDataX = bigData.as('x)
+ val bigDataY = bigData.as('y)
checkAnswer(
bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr),
@@ -181,8 +181,8 @@ class DslQuerySuite extends QueryTest {
}
test("full outer join") {
- val left = upperCaseData.where('N <= 4).subquery('left)
- val right = upperCaseData.where('N >= 3).subquery('right)
+ val left = upperCaseData.where('N <= 4).as('left)
+ val right = upperCaseData.where('N >= 3).as('right)
checkAnswer(
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
http://git-wip-us.apache.org/repos/asf/spark/blob/d956cc25/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index a62a3c4..fc68d6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -56,8 +56,8 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
}
test("self-join parquet files") {
- val x = ParquetTestData.testData.subquery('x)
- val y = ParquetTestData.testData.subquery('y)
+ val x = ParquetTestData.testData.as('x)
+ val y = ParquetTestData.testData.as('y)
val query = x.join(y).where("x.myint".attr === "y.myint".attr)
// Check to make sure that the attributes from either side of the join have unique expression