You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/10 01:02:59 UTC
spark git commit: [SQL] Add some missing DataFrame functions.
Repository: spark
Updated Branches:
refs/heads/master b884daa58 -> 68b25cf69
[SQL] Add some missing DataFrame functions.
- as with a `Symbol`
- distinct
- sqlContext.emptyDataFrame
- move add/remove col out of RDDApi section
Author: Michael Armbrust <mi...@databricks.com>
Closes #4437 from marmbrus/dfMissingFuncs and squashes the following commits:
2004023 [Michael Armbrust] Add missing functions
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68b25cf6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68b25cf6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68b25cf6
Branch: refs/heads/master
Commit: 68b25cf695e0fce9e465288d5a053e540a3fccb4
Parents: b884daa
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Feb 9 16:02:56 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Feb 9 16:02:56 2015 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/Column.scala | 9 ++++++
.../scala/org/apache/spark/sql/DataFrame.scala | 12 +++++--
.../org/apache/spark/sql/DataFrameImpl.scala | 34 ++++++++++++--------
.../apache/spark/sql/IncomputableColumn.scala | 10 +++---
.../scala/org/apache/spark/sql/RDDApi.scala | 2 ++
.../scala/org/apache/spark/sql/SQLContext.scala | 5 ++-
6 files changed, 51 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 878b2b0..1011bf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -550,6 +550,15 @@ trait Column extends DataFrame {
override def as(alias: String): Column = exprToColumn(Alias(expr, alias)())
/**
+ * Gives the column an alias.
+ * {{{
+ * // Renames colA to colB in select output.
+ * df.select($"colA".as('colB))
+ * }}}
+ */
+ override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)())
+
+ /**
* Casts the column to a different data type.
* {{{
* // Casts colA to IntegerType.
http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 17ea3cd..6abfb78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] {
def join(right: DataFrame, joinExprs: Column): DataFrame
/**
- * Join with another [[DataFrame]], usin g the given join expression. The following performs
+ * Join with another [[DataFrame]], using the given join expression. The following performs
* a full outer join between `df1` and `df2`.
*
* {{{
@@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] {
/**
* Returns a new [[DataFrame]] with an alias set.
*/
- def as(name: String): DataFrame
+ def as(alias: String): DataFrame
+
+ /**
+ * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+ */
+ def as(alias: Symbol): DataFrame
/**
* Selects a set of expressions.
@@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] {
*/
override def repartition(numPartitions: Int): DataFrame
+ /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
+ override def distinct: DataFrame
+
override def persist(): this.type
override def persist(newLevel: StorageLevel): this.type
http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index fa05a5d..7339329 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql](
}.toSeq :_*)
}
- override def as(name: String): DataFrame = Subquery(name, logicalPlan)
+ override def as(alias: String): DataFrame = Subquery(alias, logicalPlan)
+
+ override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)
override def select(cols: Column*): DataFrame = {
val exprs = cols.zipWithIndex.map {
@@ -215,7 +217,19 @@ private[sql] class DataFrameImpl protected[sql](
override def selectExpr(exprs: String*): DataFrame = {
select(exprs.map { expr =>
Column(new SqlParser().parseExpression(expr))
- } :_*)
+ }: _*)
+ }
+
+ override def addColumn(colName: String, col: Column): DataFrame = {
+ select(Column("*"), col.as(colName))
+ }
+
+ override def renameColumn(existingName: String, newName: String): DataFrame = {
+ val colNames = schema.map { field =>
+ val name = field.name
+ if (name == existingName) Column(name).as(newName) else Column(name)
+ }
+ select(colNames :_*)
}
override def filter(condition: Column): DataFrame = {
@@ -264,18 +278,8 @@ private[sql] class DataFrameImpl protected[sql](
}
/////////////////////////////////////////////////////////////////////////////
-
- override def addColumn(colName: String, col: Column): DataFrame = {
- select(Column("*"), col.as(colName))
- }
-
- override def renameColumn(existingName: String, newName: String): DataFrame = {
- val colNames = schema.map { field =>
- val name = field.name
- if (name == existingName) Column(name).as(newName) else Column(name)
- }
- select(colNames :_*)
- }
+ // RDD API
+ /////////////////////////////////////////////////////////////////////////////
override def head(n: Int): Array[Row] = limit(n).collect()
@@ -307,6 +311,8 @@ private[sql] class DataFrameImpl protected[sql](
sqlContext.applySchema(rdd.repartition(numPartitions), schema)
}
+ override def distinct: DataFrame = Distinct(logicalPlan)
+
override def persist(): this.type = {
sqlContext.cacheManager.cacheQuery(this)
this
http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 782f6e2..0600dcc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -86,6 +86,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def selectExpr(exprs: String*): DataFrame = err()
+ override def addColumn(colName: String, col: Column): DataFrame = err()
+
+ override def renameColumn(existingName: String, newName: String): DataFrame = err()
+
override def filter(condition: Column): DataFrame = err()
override def filter(conditionExpr: String): DataFrame = err()
@@ -110,10 +114,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
/////////////////////////////////////////////////////////////////////////////
- override def addColumn(colName: String, col: Column): DataFrame = err()
-
- override def renameColumn(existingName: String, newName: String): DataFrame = err()
-
override def head(n: Int): Array[Row] = err()
override def head(): Row = err()
@@ -140,6 +140,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def repartition(numPartitions: Int): DataFrame = err()
+ override def distinct: DataFrame = err()
+
override def persist(): this.type = err()
override def persist(newLevel: StorageLevel): this.type = err()
http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index 38e6382..df866fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -60,4 +60,6 @@ private[sql] trait RDDApi[T] {
def first(): T
def repartition(numPartitions: Int): DataFrame
+
+ def distinct: DataFrame
}
http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bf39906..97e3777 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -130,6 +130,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
val experimental: ExperimentalMethods = new ExperimentalMethods(this)
+ /** Returns a [[DataFrame]] with no rows or columns. */
+ lazy val emptyDataFrame = DataFrame(this, NoRelation)
+
/**
* A collection of methods for registering user-defined functions (UDF).
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org