You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/10 01:02:59 UTC

spark git commit: [SQL] Add some missing DataFrame functions.

Repository: spark
Updated Branches:
  refs/heads/master b884daa58 -> 68b25cf69


[SQL] Add some missing DataFrame functions.

- as with a `Symbol`
- distinct
- sqlContext.emptyDataFrame
- move add/remove col out of RDDApi section

Author: Michael Armbrust <mi...@databricks.com>

Closes #4437 from marmbrus/dfMissingFuncs and squashes the following commits:

2004023 [Michael Armbrust] Add missing functions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68b25cf6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68b25cf6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68b25cf6

Branch: refs/heads/master
Commit: 68b25cf695e0fce9e465288d5a053e540a3fccb4
Parents: b884daa
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Feb 9 16:02:56 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Feb 9 16:02:56 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Column.scala     |  9 ++++++
 .../scala/org/apache/spark/sql/DataFrame.scala  | 12 +++++--
 .../org/apache/spark/sql/DataFrameImpl.scala    | 34 ++++++++++++--------
 .../apache/spark/sql/IncomputableColumn.scala   | 10 +++---
 .../scala/org/apache/spark/sql/RDDApi.scala     |  2 ++
 .../scala/org/apache/spark/sql/SQLContext.scala |  5 ++-
 6 files changed, 51 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 878b2b0..1011bf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -550,6 +550,15 @@ trait Column extends DataFrame {
   override def as(alias: String): Column = exprToColumn(Alias(expr, alias)())
 
   /**
+   * Gives the column an alias.
+   * {{{
+   *   // Renames colA to colB in select output.
+   *   df.select($"colA".as('colB))
+   * }}}
+   */
+  override def as(alias: Symbol): Column = exprToColumn(Alias(expr, alias.name)())
+
+  /**
    * Casts the column to a different data type.
    * {{{
    *   // Casts colA to IntegerType.

http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 17ea3cd..6abfb78 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -156,7 +156,7 @@ trait DataFrame extends RDDApi[Row] {
   def join(right: DataFrame, joinExprs: Column): DataFrame
 
   /**
-   * Join with another [[DataFrame]], usin  g the given join expression. The following performs
+   * Join with another [[DataFrame]], using the given join expression. The following performs
    * a full outer join between `df1` and `df2`.
    *
    * {{{
@@ -233,7 +233,12 @@ trait DataFrame extends RDDApi[Row] {
   /**
    * Returns a new [[DataFrame]] with an alias set.
    */
-  def as(name: String): DataFrame
+  def as(alias: String): DataFrame
+
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+   */
+  def as(alias: Symbol): DataFrame
 
   /**
    * Selects a set of expressions.
@@ -516,6 +521,9 @@ trait DataFrame extends RDDApi[Row] {
    */
   override def repartition(numPartitions: Int): DataFrame
 
+  /** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
+  override def distinct: DataFrame
+
   override def persist(): this.type
 
   override def persist(newLevel: StorageLevel): this.type

http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index fa05a5d..7339329 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -196,7 +196,9 @@ private[sql] class DataFrameImpl protected[sql](
     }.toSeq :_*)
   }
 
-  override def as(name: String): DataFrame = Subquery(name, logicalPlan)
+  override def as(alias: String): DataFrame = Subquery(alias, logicalPlan)
+
+  override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)
 
   override def select(cols: Column*): DataFrame = {
     val exprs = cols.zipWithIndex.map {
@@ -215,7 +217,19 @@ private[sql] class DataFrameImpl protected[sql](
   override def selectExpr(exprs: String*): DataFrame = {
     select(exprs.map { expr =>
       Column(new SqlParser().parseExpression(expr))
-    } :_*)
+    }: _*)
+  }
+
+  override def addColumn(colName: String, col: Column): DataFrame = {
+    select(Column("*"), col.as(colName))
+  }
+
+  override def renameColumn(existingName: String, newName: String): DataFrame = {
+    val colNames = schema.map { field =>
+      val name = field.name
+      if (name == existingName) Column(name).as(newName) else Column(name)
+    }
+    select(colNames :_*)
   }
 
   override def filter(condition: Column): DataFrame = {
@@ -264,18 +278,8 @@ private[sql] class DataFrameImpl protected[sql](
   }
 
   /////////////////////////////////////////////////////////////////////////////
-
-  override def addColumn(colName: String, col: Column): DataFrame = {
-    select(Column("*"), col.as(colName))
-  }
-
-  override def renameColumn(existingName: String, newName: String): DataFrame = {
-    val colNames = schema.map { field =>
-      val name = field.name
-      if (name == existingName) Column(name).as(newName) else Column(name)
-    }
-    select(colNames :_*)
-  }
+  // RDD API
+  /////////////////////////////////////////////////////////////////////////////
 
   override def head(n: Int): Array[Row] = limit(n).collect()
 
@@ -307,6 +311,8 @@ private[sql] class DataFrameImpl protected[sql](
     sqlContext.applySchema(rdd.repartition(numPartitions), schema)
   }
 
+  override def distinct: DataFrame = Distinct(logicalPlan)
+
   override def persist(): this.type = {
     sqlContext.cacheManager.cacheQuery(this)
     this

http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 782f6e2..0600dcc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -86,6 +86,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def selectExpr(exprs: String*): DataFrame = err()
 
+  override def addColumn(colName: String, col: Column): DataFrame = err()
+
+  override def renameColumn(existingName: String, newName: String): DataFrame = err()
+
   override def filter(condition: Column): DataFrame = err()
 
   override def filter(conditionExpr: String): DataFrame = err()
@@ -110,10 +114,6 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   /////////////////////////////////////////////////////////////////////////////
 
-  override def addColumn(colName: String, col: Column): DataFrame = err()
-
-  override def renameColumn(existingName: String, newName: String): DataFrame = err()
-
   override def head(n: Int): Array[Row] = err()
 
   override def head(): Row = err()
@@ -140,6 +140,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def repartition(numPartitions: Int): DataFrame = err()
 
+  override def distinct: DataFrame = err()
+
   override def persist(): this.type = err()
 
   override def persist(newLevel: StorageLevel): this.type = err()

http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
index 38e6382..df866fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala
@@ -60,4 +60,6 @@ private[sql] trait RDDApi[T] {
   def first(): T
 
   def repartition(numPartitions: Int): DataFrame
+
+  def distinct: DataFrame
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/68b25cf6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bf39906..97e3777 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, NoRelation}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -130,6 +130,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   val experimental: ExperimentalMethods = new ExperimentalMethods(this)
 
+  /** Returns a [[DataFrame]] with no rows or columns. */
+  lazy val emptyDataFrame = DataFrame(this, NoRelation)
+
   /**
    * A collection of methods for registering user-defined functions (UDF).
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org