You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/28 01:08:44 UTC

[3/5] spark git commit: [SPARK-5097][SQL] DataFrame

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
new file mode 100644
index 0000000..d0bb364
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -0,0 +1,596 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+import scala.collection.JavaConversions._
+
+import java.util.{ArrayList, List => JList}
+
+import com.fasterxml.jackson.core.JsonFactory
+import net.razorvine.pickle.Pickler
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
+import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
+import org.apache.spark.sql.json.JsonRDD
+import org.apache.spark.sql.types.{NumericType, StructType}
+import org.apache.spark.util.Utils
+
+
+/**
+ * A collection of rows that have the same columns.
+ *
+ * A [[DataFrame]] is equivalent to a relational table in Spark SQL, and can be created using
+ * various functions in [[SQLContext]].
+ * {{{
+ *   val people = sqlContext.parquetFile("...")
+ * }}}
+ *
+ * Once created, it can be manipulated using the various domain-specific-language (DSL) functions
+ * defined in: [[DataFrame]] (this class), [[Column]], and [[dsl]] for Scala DSL.
+ *
+ * To select a column from the data frame, use the apply method:
+ * {{{
+ *   val ageCol = people("age")  // in Scala
+ *   Column ageCol = people.apply("age")  // in Java
+ * }}}
+ *
+ * Note that the [[Column]] type can also be manipulated through its various functions.
+ * {{
+ *   // The following creates a new column that increases everybody's age by 10.
+ *   people("age") + 10  // in Scala
+ * }}
+ *
+ * A more concrete example:
+ * {{{
+ *   // To create DataFrame using SQLContext
+ *   val people = sqlContext.parquetFile("...")
+ *   val department = sqlContext.parquetFile("...")
+ *
+ *   people.filter("age" > 30)
+ *     .join(department, people("deptId") === department("id"))
+ *     .groupBy(department("name"), "gender")
+ *     .agg(avg(people("salary")), max(people("age")))
+ * }}}
+ */
+// TODO: Improve documentation.
+class DataFrame protected[sql](
+    val sqlContext: SQLContext,
+    private val baseLogicalPlan: LogicalPlan,
+    operatorsEnabled: Boolean)
+  extends DataFrameSpecificApi with RDDApi[Row] {
+
+  protected[sql] def this(sqlContext: Option[SQLContext], plan: Option[LogicalPlan]) =
+    this(sqlContext.orNull, plan.orNull, sqlContext.isDefined && plan.isDefined)
+
+  protected[sql] def this(sqlContext: SQLContext, plan: LogicalPlan) = this(sqlContext, plan, true)
+
+  @transient protected[sql] lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
+
+  @transient protected[sql] val logicalPlan: LogicalPlan = baseLogicalPlan match {
+    // For various commands (like DDL) and queries with side effects, we force query optimization to
+    // happen right away to let these side effects take place eagerly.
+    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
+      LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
+    case _ =>
+      baseLogicalPlan
+  }
+
+  /**
+   * An implicit conversion function internal to this class for us to avoid doing
+   * "new DataFrame(...)" everywhere.
+   */
+  private[this] implicit def toDataFrame(logicalPlan: LogicalPlan): DataFrame = {
+    new DataFrame(sqlContext, logicalPlan, true)
+  }
+
+  /** Return the list of numeric columns, useful for doing aggregation. */
+  protected[sql] def numericColumns: Seq[Expression] = {
+    schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
+      logicalPlan.resolve(n.name, sqlContext.analyzer.resolver).get
+    }
+  }
+
+  /** Resolve a column name into a Catalyst [[NamedExpression]]. */
+  protected[sql] def resolve(colName: String): NamedExpression = {
+    logicalPlan.resolve(colName, sqlContext.analyzer.resolver).getOrElse(
+      throw new RuntimeException(s"""Cannot resolve column name "$colName""""))
+  }
+
+  /** Left here for compatibility reasons. */
+  @deprecated("1.3.0", "use toDataFrame")
+  def toSchemaRDD: DataFrame = this
+
+  /**
+   * Return the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
+   */
+  def toDF: DataFrame = this
+
+  /** Return the schema of this [[DataFrame]]. */
+  override def schema: StructType = queryExecution.analyzed.schema
+
+  /** Return all column names and their data types as an array. */
+  override def dtypes: Array[(String, String)] = schema.fields.map { field =>
+    (field.name, field.dataType.toString)
+  }
+
+  /** Return all column names as an array. */
+  override def columns: Array[String] = schema.fields.map(_.name)
+
+  /** Print the schema to the console in a nice tree format. */
+  override def printSchema(): Unit = println(schema.treeString)
+
+  /**
+   * Cartesian join with another [[DataFrame]].
+   *
+   * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+   *
+   * @param right Right side of the join operation.
+   */
+  override def join(right: DataFrame): DataFrame = {
+    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
+  }
+
+  /**
+   * Inner join with another [[DataFrame]], using the given join expression.
+   *
+   * {{{
+   *   // The following two are equivalent:
+   *   df1.join(df2, $"df1Key" === $"df2Key")
+   *   df1.join(df2).where($"df1Key" === $"df2Key")
+   * }}}
+   */
+  override def join(right: DataFrame, joinExprs: Column): DataFrame = {
+    Join(logicalPlan, right.logicalPlan, Inner, Some(joinExprs.expr))
+  }
+
+  /**
+   * Join with another [[DataFrame]], usin  g the given join expression. The following performs
+   * a full outer join between `df1` and `df2`.
+   *
+   * {{{
+   *   df1.join(df2, "outer", $"df1Key" === $"df2Key")
+   * }}}
+   *
+   * @param right Right side of the join.
+   * @param joinExprs Join expression.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
+   */
+  override def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
+    Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))
+  }
+
+  /**
+   * Return a new [[DataFrame]] sorted by the specified column, in ascending column.
+   * {{{
+   *   // The following 3 are equivalent
+   *   df.sort("sortcol")
+   *   df.sort($"sortcol")
+   *   df.sort($"sortcol".asc)
+   * }}}
+   */
+  override def sort(colName: String): DataFrame = {
+    Sort(Seq(SortOrder(apply(colName).expr, Ascending)), global = true, logicalPlan)
+  }
+
+  /**
+   * Return a new [[DataFrame]] sorted by the given expressions. For example:
+   * {{{
+   *   df.sort($"col1", $"col2".desc)
+   * }}}
+   */
+  @scala.annotation.varargs
+  override def sort(sortExpr: Column, sortExprs: Column*): DataFrame = {
+    val sortOrder: Seq[SortOrder] = (sortExpr +: sortExprs).map { col =>
+      col.expr match {
+        case expr: SortOrder =>
+          expr
+        case expr: Expression =>
+          SortOrder(expr, Ascending)
+      }
+    }
+    Sort(sortOrder, global = true, logicalPlan)
+  }
+
+  /**
+   * Return a new [[DataFrame]] sorted by the given expressions.
+   * This is an alias of the `sort` function.
+   */
+  @scala.annotation.varargs
+  override def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame = {
+    sort(sortExpr, sortExprs :_*)
+  }
+
+  /**
+   * Selecting a single column and return it as a [[Column]].
+   */
+  override def apply(colName: String): Column = {
+    val expr = resolve(colName)
+    new Column(Some(sqlContext), Some(Project(Seq(expr), logicalPlan)), expr)
+  }
+
+  /**
+   * Selecting a set of expressions, wrapped in a Product.
+   * {{{
+   *   // The following two are equivalent:
+   *   df.apply(($"colA", $"colB" + 1))
+   *   df.select($"colA", $"colB" + 1)
+   * }}}
+   */
+  override def apply(projection: Product): DataFrame = {
+    require(projection.productArity >= 1)
+    select(projection.productIterator.map {
+      case c: Column => c
+      case o: Any => new Column(Some(sqlContext), None, LiteralExpr(o))
+    }.toSeq :_*)
+  }
+
+  /**
+   * Alias the current [[DataFrame]].
+   */
+  override def as(name: String): DataFrame = Subquery(name, logicalPlan)
+
+  /**
+   * Selecting a set of expressions.
+   * {{{
+   *   df.select($"colA", $"colB" + 1)
+   * }}}
+   */
+  @scala.annotation.varargs
+  override def select(cols: Column*): DataFrame = {
+    val exprs = cols.zipWithIndex.map {
+      case (Column(expr: NamedExpression), _) =>
+        expr
+      case (Column(expr: Expression), _) =>
+        Alias(expr, expr.toString)()
+    }
+    Project(exprs.toSeq, logicalPlan)
+  }
+
+  /**
+   * Selecting a set of columns. This is a variant of `select` that can only select
+   * existing columns using column names (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // The following two are equivalent:
+   *   df.select("colA", "colB")
+   *   df.select($"colA", $"colB")
+   * }}}
+   */
+  @scala.annotation.varargs
+  override def select(col: String, cols: String*): DataFrame = {
+    select((col +: cols).map(new Column(_)) :_*)
+  }
+
+  /**
+   * Filtering rows using the given condition.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDf.filter($"age" > 15)
+   *   peopleDf.where($"age" > 15)
+   *   peopleDf($"age" > 15)
+   * }}}
+   */
+  override def filter(condition: Column): DataFrame = {
+    Filter(condition.expr, logicalPlan)
+  }
+
+  /**
+   * Filtering rows using the given condition. This is an alias for `filter`.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDf.filter($"age" > 15)
+   *   peopleDf.where($"age" > 15)
+   *   peopleDf($"age" > 15)
+   * }}}
+   */
+  override def where(condition: Column): DataFrame = filter(condition)
+
+  /**
+   * Filtering rows using the given condition. This is a shorthand meant for Scala.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDf.filter($"age" > 15)
+   *   peopleDf.where($"age" > 15)
+   *   peopleDf($"age" > 15)
+   * }}}
+   */
+  override def apply(condition: Column): DataFrame = filter(condition)
+
+  /**
+   * Group the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+   * See [[GroupedDataFrame]] for all the available aggregate functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   df.groupBy($"department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and gender.
+   *   df.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   */
+  @scala.annotation.varargs
+  override def groupBy(cols: Column*): GroupedDataFrame = {
+    new GroupedDataFrame(this, cols.map(_.expr))
+  }
+
+  /**
+   * Group the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+   * See [[GroupedDataFrame]] for all the available aggregate functions.
+   *
+   * This is a variant of groupBy that can only group by existing columns using column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   df.groupBy("department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and gender.
+   *   df.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   */
+  @scala.annotation.varargs
+  override def groupBy(col1: String, cols: String*): GroupedDataFrame = {
+    val colNames: Seq[String] = col1 +: cols
+    new GroupedDataFrame(this, colNames.map(colName => resolve(colName)))
+  }
+
+  /**
+   * Aggregate on the entire [[DataFrame]] without groups.
+   * {{
+   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
+   *   df.agg(Map("age" -> "max", "salary" -> "avg"))
+   *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+   * }}
+   */
+  override def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
+
+  /**
+   * Aggregate on the entire [[DataFrame]] without groups.
+   * {{
+   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
+   *   df.agg(max($"age"), avg($"salary"))
+   *   df.groupBy().agg(max($"age"), avg($"salary"))
+   * }}
+   */
+  @scala.annotation.varargs
+  override def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*)
+
+  /**
+   * Return a new [[DataFrame]] by taking the first `n` rows. The difference between this function
+   * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
+   */
+  override def limit(n: Int): DataFrame = Limit(LiteralExpr(n), logicalPlan)
+
+  /**
+   * Return a new [[DataFrame]] containing union of rows in this frame and another frame.
+   * This is equivalent to `UNION ALL` in SQL.
+   */
+  override def unionAll(other: DataFrame): DataFrame = Union(logicalPlan, other.logicalPlan)
+
+  /**
+   * Return a new [[DataFrame]] containing rows only in both this frame and another frame.
+   * This is equivalent to `INTERSECT` in SQL.
+   */
+  override def intersect(other: DataFrame): DataFrame = Intersect(logicalPlan, other.logicalPlan)
+
+  /**
+   * Return a new [[DataFrame]] containing rows in this frame but not in another frame.
+   * This is equivalent to `EXCEPT` in SQL.
+   */
+  override def except(other: DataFrame): DataFrame = Except(logicalPlan, other.logicalPlan)
+
+  /**
+   * Return a new [[DataFrame]] by sampling a fraction of rows.
+   *
+   * @param withReplacement Sample with replacement or not.
+   * @param fraction Fraction of rows to generate.
+   * @param seed Seed for sampling.
+   */
+  override def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame = {
+    Sample(fraction, withReplacement, seed, logicalPlan)
+  }
+
+  /**
+   * Return a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
+   *
+   * @param withReplacement Sample with replacement or not.
+   * @param fraction Fraction of rows to generate.
+   */
+  override def sample(withReplacement: Boolean, fraction: Double): DataFrame = {
+    sample(withReplacement, fraction, Utils.random.nextLong)
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Return a new [[DataFrame]] by adding a column.
+   */
+  override def addColumn(colName: String, col: Column): DataFrame = {
+    select(Column("*"), col.as(colName))
+  }
+
+  /**
+   * Return the first `n` rows.
+   */
+  override def head(n: Int): Array[Row] = limit(n).collect()
+
+  /**
+   * Return the first row.
+   */
+  override def head(): Row = head(1).head
+
+  /**
+   * Return the first row. Alias for head().
+   */
+  override def first(): Row = head()
+
+  override def map[R: ClassTag](f: Row => R): RDD[R] = {
+    rdd.map(f)
+  }
+
+  override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
+    rdd.mapPartitions(f)
+  }
+
+  /**
+   * Return the first `n` rows in the [[DataFrame]].
+   */
+  override def take(n: Int): Array[Row] = head(n)
+
+  /**
+   * Return an array that contains all of [[Row]]s in this [[DataFrame]].
+   */
+  override def collect(): Array[Row] = rdd.collect()
+
+  /**
+   * Return a Java list that contains all of [[Row]]s in this [[DataFrame]].
+   */
+  override def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(rdd.collect() :_*)
+
+  /**
+   * Return the number of rows in the [[DataFrame]].
+   */
+  override def count(): Long = groupBy().count().rdd.collect().head.getLong(0)
+
+  /**
+   * Return a new [[DataFrame]] that has exactly `numPartitions` partitions.
+   */
+  override def repartition(numPartitions: Int): DataFrame = {
+    sqlContext.applySchema(rdd.repartition(numPartitions), schema)
+  }
+
+  override def persist(): this.type = {
+    sqlContext.cacheQuery(this)
+    this
+  }
+
+  override def persist(newLevel: StorageLevel): this.type = {
+    sqlContext.cacheQuery(this, None, newLevel)
+    this
+  }
+
+  override def unpersist(blocking: Boolean): this.type = {
+    sqlContext.tryUncacheQuery(this, blocking)
+    this
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // I/O
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Return the content of the [[DataFrame]] as a [[RDD]] of [[Row]]s.
+   */
+  override def rdd: RDD[Row] = {
+    val schema = this.schema
+    queryExecution.executedPlan.execute().map(ScalaReflection.convertRowToScala(_, schema))
+  }
+
+  /**
+   * Registers this RDD as a temporary table using the given name.  The lifetime of this temporary
+   * table is tied to the [[SQLContext]] that was used to create this DataFrame.
+   *
+   * @group schema
+   */
+  override def registerTempTable(tableName: String): Unit = {
+    sqlContext.registerRDDAsTable(this, tableName)
+  }
+
+  /**
+   * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
+   * Files that are written out using this method can be read back in as a [[DataFrame]]
+   * using the `parquetFile` function in [[SQLContext]].
+   */
+  override def saveAsParquetFile(path: String): Unit = {
+    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the the contents of this DataFrame.  This will fail if the table already
+   * exists.
+   *
+   * Note that this currently only works with DataFrame that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   */
+  @Experimental
+  override def saveAsTable(tableName: String): Unit = {
+    sqlContext.executePlan(
+      CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+   */
+  @Experimental
+  override def insertInto(tableName: String, overwrite: Boolean): Unit = {
+    sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
+      Map.empty, logicalPlan, overwrite)).toRdd
+  }
+
+  /**
+   * Return the content of the [[DataFrame]] as a RDD of JSON strings.
+   */
+  override def toJSON: RDD[String] = {
+    val rowSchema = this.schema
+    this.mapPartitions { iter =>
+      val jsonFactory = new JsonFactory()
+      iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
+    }
+  }
+
+  ////////////////////////////////////////////////////////////////////////////
+  // for Python API
+  ////////////////////////////////////////////////////////////////////////////
+  /**
+   * A helpful function for Py4j, convert a list of Column to an array
+   */
+  protected[sql] def toColumnArray(cols: JList[Column]): Array[Column] = {
+    cols.toList.toArray
+  }
+
+  /**
+   * Converts a JavaRDD to a PythonRDD.
+   */
+  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+    val fieldTypes = schema.fields.map(_.dataType)
+    val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
+    SerDeUtil.javaToPython(jrdd)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
new file mode 100644
index 0000000..1f1e9bd
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.language.implicitConversions
+import scala.collection.JavaConversions._
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+
+
+/**
+ * A set of methods for aggregations on a [[DataFrame]], created by [[DataFrame.groupBy]].
+ */
+class GroupedDataFrame protected[sql](df: DataFrame, groupingExprs: Seq[Expression])
+  extends GroupedDataFrameApi {
+
+  private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): DataFrame = {
+    val namedGroupingExprs = groupingExprs.map {
+      case expr: NamedExpression => expr
+      case expr: Expression => Alias(expr, expr.toString)()
+    }
+    new DataFrame(df.sqlContext,
+      Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan))
+  }
+
+  private[this] def aggregateNumericColumns(f: Expression => Expression): Seq[NamedExpression] = {
+    df.numericColumns.map { c =>
+      val a = f(c)
+      Alias(a, a.toString)()
+    }
+  }
+
+  private[this] def strToExpr(expr: String): (Expression => Expression) = {
+    expr.toLowerCase match {
+      case "avg" | "average" | "mean" => Average
+      case "max" => Max
+      case "min" => Min
+      case "sum" => Sum
+      case "count" | "size" => Count
+    }
+  }
+
+  /**
+   * Compute aggregates by specifying a map from column name to aggregate methods.
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for each department
+   *   df.groupBy("department").agg(Map(
+   *     "age" -> "max"
+   *     "sum" -> "expense"
+   *   ))
+   * }}}
+   */
+  override def agg(exprs: Map[String, String]): DataFrame = {
+    exprs.map { case (colName, expr) =>
+      val a = strToExpr(expr)(df(colName).expr)
+      Alias(a, a.toString)()
+    }.toSeq
+  }
+
+  /**
+   * Compute aggregates by specifying a map from column name to aggregate methods.
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for each department
+   *   df.groupBy("department").agg(Map(
+   *     "age" -> "max"
+   *     "sum" -> "expense"
+   *   ))
+   * }}}
+   */
+  def agg(exprs: java.util.Map[String, String]): DataFrame = {
+    agg(exprs.toMap)
+  }
+
+  /**
+   * Compute aggregates by specifying a series of aggregate columns.
+   * The available aggregate methods are defined in [[org.apache.spark.sql.dsl]].
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for each department
+   *   import org.apache.spark.sql.dsl._
+   *   df.groupBy("department").agg(max($"age"), sum($"expense"))
+   * }}}
+   */
+  @scala.annotation.varargs
+  override def agg(expr: Column, exprs: Column*): DataFrame = {
+    val aggExprs = (expr +: exprs).map(_.expr).map {
+      case expr: NamedExpression => expr
+      case expr: Expression => Alias(expr, expr.toString)()
+    }
+
+    new DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan))
+  }
+
+  /** Count the number of rows for each group. */
+  override def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")())
+
+  /**
+   * Compute the average value for each numeric columns for each group. This is an alias for `avg`.
+   */
+  override def mean(): DataFrame = aggregateNumericColumns(Average)
+
+  /**
+   * Compute the max value for each numeric columns for each group.
+   */
+  override def max(): DataFrame = aggregateNumericColumns(Max)
+
+  /**
+   * Compute the mean value for each numeric columns for each group.
+   */
+  override def avg(): DataFrame = aggregateNumericColumns(Average)
+
+  /**
+   * Compute the min value for each numeric column for each group.
+   */
+  override def min(): DataFrame = aggregateNumericColumns(Min)
+
+  /**
+   * Compute the sum for each numeric columns for each group.
+   */
+  override def sum(): DataFrame = aggregateNumericColumns(Sum)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala b/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala
new file mode 100644
index 0000000..08cd4d0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Literal.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
+import org.apache.spark.sql.types._
+
+object Literal {
+
+  /** Return a new boolean literal. */
+  def apply(literal: Boolean): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new byte literal. */
+  def apply(literal: Byte): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new short literal. */
+  def apply(literal: Short): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new int literal. */
+  def apply(literal: Int): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new long literal. */
+  def apply(literal: Long): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new float literal. */
+  def apply(literal: Float): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new double literal. */
+  def apply(literal: Double): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new string literal. */
+  def apply(literal: String): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new decimal literal. */
+  def apply(literal: BigDecimal): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new decimal literal. */
+  def apply(literal: java.math.BigDecimal): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new timestamp literal. */
+  def apply(literal: java.sql.Timestamp): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new date literal. */
+  def apply(literal: java.sql.Date): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new binary (byte array) literal. */
+  def apply(literal: Array[Byte]): Column = new Column(LiteralExpr(literal))
+
+  /** Return a new null literal. */
+  def apply(literal: Null): Column = new Column(LiteralExpr(null))
+
+  /**
+   * Return a Column expression representing the literal value. Throws an exception if the
+   * data type is not supported by SparkSQL.
+   */
+  protected[sql] def anyToLiteral(literal: Any): Column = {
+    // If the literal is a symbol, convert it into a Column.
+    if (literal.isInstanceOf[Symbol]) {
+      return dsl.symbolToColumn(literal.asInstanceOf[Symbol])
+    }
+
+    val literalExpr = literal match {
+      case v: Int => LiteralExpr(v, IntegerType)
+      case v: Long => LiteralExpr(v, LongType)
+      case v: Double => LiteralExpr(v, DoubleType)
+      case v: Float => LiteralExpr(v, FloatType)
+      case v: Byte => LiteralExpr(v, ByteType)
+      case v: Short => LiteralExpr(v, ShortType)
+      case v: String => LiteralExpr(v, StringType)
+      case v: Boolean => LiteralExpr(v, BooleanType)
+      case v: BigDecimal => LiteralExpr(Decimal(v), DecimalType.Unlimited)
+      case v: java.math.BigDecimal => LiteralExpr(Decimal(v), DecimalType.Unlimited)
+      case v: Decimal => LiteralExpr(v, DecimalType.Unlimited)
+      case v: java.sql.Timestamp => LiteralExpr(v, TimestampType)
+      case v: java.sql.Date => LiteralExpr(v, DateType)
+      case v: Array[Byte] => LiteralExpr(v, BinaryType)
+      case null => LiteralExpr(null, NullType)
+      case _ =>
+        throw new RuntimeException("Unsupported literal type " + literal.getClass + " " + literal)
+    }
+    new Column(literalExpr)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0a22968..5030e68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -30,7 +30,6 @@ import org.apache.spark.api.java.{JavaSparkContext, JavaRDD}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -43,7 +42,7 @@ import org.apache.spark.util.Utils
 
 /**
  * :: AlphaComponent ::
- * The entry point for running relational queries using Spark.  Allows the creation of [[SchemaRDD]]
+ * The entry point for running relational queries using Spark.  Allows the creation of [[DataFrame]]
  * objects and the execution of SQL queries.
  *
  * @groupname userf Spark SQL Functions
@@ -53,7 +52,6 @@ import org.apache.spark.util.Utils
 class SQLContext(@transient val sparkContext: SparkContext)
   extends org.apache.spark.Logging
   with CacheManager
-  with ExpressionConversions
   with Serializable {
 
   self =>
@@ -111,8 +109,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
-  protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
-    new this.QueryExecution { val logical = plan }
+
+  protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)
 
   sparkContext.getConf.getAll.foreach {
     case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
@@ -124,24 +122,24 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group userf
    */
-  implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]): SchemaRDD = {
+  implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]): DataFrame = {
     SparkPlan.currentContext.set(self)
     val attributeSeq = ScalaReflection.attributesFor[A]
     val schema = StructType.fromAttributes(attributeSeq)
     val rowRDD = RDDConversions.productToRowRdd(rdd, schema)
-    new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self))
+    new DataFrame(this, LogicalRDD(attributeSeq, rowRDD)(self))
   }
 
   /**
-   * Convert a [[BaseRelation]] created for external data sources into a [[SchemaRDD]].
+   * Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]].
    */
-  def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
-    new SchemaRDD(this, LogicalRelation(baseRelation))
+  def baseRelationToSchemaRDD(baseRelation: BaseRelation): DataFrame = {
+    new DataFrame(this, LogicalRelation(baseRelation))
   }
 
   /**
    * :: DeveloperApi ::
-   * Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
+   * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
    * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
    * the provided schema. Otherwise, there will be runtime exception.
    * Example:
@@ -170,11 +168,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group userf
    */
   @DeveloperApi
-  def applySchema(rowRDD: RDD[Row], schema: StructType): SchemaRDD = {
+  def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
     // TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied
     // schema differs from the existing schema on any field data type.
     val logicalPlan = LogicalRDD(schema.toAttributes, rowRDD)(self)
-    new SchemaRDD(this, logicalPlan)
+    new DataFrame(this, logicalPlan)
   }
 
   /**
@@ -183,7 +181,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
    *          SELECT * queries will return the columns in an undefined order.
    */
-  def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = {
+  def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
     val attributeSeq = getSchema(beanClass)
     val className = beanClass.getName
     val rowRdd = rdd.mapPartitions { iter =>
@@ -201,7 +199,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
         ) : Row
       }
     }
-    new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this))
+    new DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
   }
 
   /**
@@ -210,35 +208,35 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
    *          SELECT * queries will return the columns in an undefined order.
    */
-  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = {
+  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
     applySchema(rdd.rdd, beanClass)
   }
 
   /**
-   * Loads a Parquet file, returning the result as a [[SchemaRDD]].
+   * Loads a Parquet file, returning the result as a [[DataFrame]].
    *
    * @group userf
    */
-  def parquetFile(path: String): SchemaRDD =
-    new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
+  def parquetFile(path: String): DataFrame =
+    new DataFrame(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
 
   /**
-   * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
+   * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
    * It goes through the entire dataset once to determine the schema.
    *
    * @group userf
    */
-  def jsonFile(path: String): SchemaRDD = jsonFile(path, 1.0)
+  def jsonFile(path: String): DataFrame = jsonFile(path, 1.0)
 
   /**
    * :: Experimental ::
    * Loads a JSON file (one object per line) and applies the given schema,
-   * returning the result as a [[SchemaRDD]].
+   * returning the result as a [[DataFrame]].
    *
    * @group userf
    */
   @Experimental
-  def jsonFile(path: String, schema: StructType): SchemaRDD = {
+  def jsonFile(path: String, schema: StructType): DataFrame = {
     val json = sparkContext.textFile(path)
     jsonRDD(json, schema)
   }
@@ -247,29 +245,29 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    */
   @Experimental
-  def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
+  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
     val json = sparkContext.textFile(path)
     jsonRDD(json, samplingRatio)
   }
 
   /**
    * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
-   * [[SchemaRDD]].
+   * [[DataFrame]].
    * It goes through the entire dataset once to determine the schema.
    *
    * @group userf
    */
-  def jsonRDD(json: RDD[String]): SchemaRDD = jsonRDD(json, 1.0)
+  def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
 
   /**
    * :: Experimental ::
    * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
-   * returning the result as a [[SchemaRDD]].
+   * returning the result as a [[DataFrame]].
    *
    * @group userf
    */
   @Experimental
-  def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
+  def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
     val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
     val appliedSchema =
       Option(schema).getOrElse(
@@ -283,7 +281,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * :: Experimental ::
    */
   @Experimental
-  def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
+  def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
     val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
     val appliedSchema =
       JsonRDD.nullTypeToStringType(
@@ -298,8 +296,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group userf
    */
-  def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
-    catalog.registerTable(Seq(tableName), rdd.queryExecution.logical)
+  def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = {
+    catalog.registerTable(Seq(tableName), rdd.logicalPlan)
   }
 
   /**
@@ -321,17 +319,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group userf
    */
-  def sql(sqlText: String): SchemaRDD = {
+  def sql(sqlText: String): DataFrame = {
     if (conf.dialect == "sql") {
-      new SchemaRDD(this, parseSql(sqlText))
+      new DataFrame(this, parseSql(sqlText))
     } else {
       sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
     }
   }
 
   /** Returns the specified table as a SchemaRDD */
-  def table(tableName: String): SchemaRDD =
-    new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))
+  def table(tableName: String): DataFrame =
+    new DataFrame(this, catalog.lookupRelation(Seq(tableName)))
 
   /**
    * A collection of methods that are considered experimental, but can be used to hook into
@@ -454,15 +452,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * access to the intermediate phases of query execution for developers.
    */
   @DeveloperApi
-  protected abstract class QueryExecution {
-    def logical: LogicalPlan
+  protected class QueryExecution(val logical: LogicalPlan) {
 
-    lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
-    lazy val withCachedData = useCachedData(analyzed)
-    lazy val optimizedPlan = optimizer(withCachedData)
+    lazy val analyzed: LogicalPlan = ExtractPythonUdfs(analyzer(logical))
+    lazy val withCachedData: LogicalPlan = useCachedData(analyzed)
+    lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData)
 
     // TODO: Don't just pick the first one...
-    lazy val sparkPlan = {
+    lazy val sparkPlan: SparkPlan = {
       SparkPlan.currentContext.set(self)
       planner(optimizedPlan).next()
     }
@@ -512,7 +509,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   protected[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
-      schemaString: String): SchemaRDD = {
+      schemaString: String): DataFrame = {
     val schema = parseDataType(schemaString).asInstanceOf[StructType]
     applySchemaToPythonRDD(rdd, schema)
   }
@@ -522,7 +519,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   protected[sql] def applySchemaToPythonRDD(
       rdd: RDD[Array[Any]],
-      schema: StructType): SchemaRDD = {
+      schema: StructType): DataFrame = {
 
     def needsConversion(dataType: DataType): Boolean = dataType match {
       case ByteType => true
@@ -549,7 +546,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
       iter.map { m => new GenericRow(m): Row}
     }
 
-    new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
+    new DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
deleted file mode 100644
index d1e21df..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql
-
-import java.util.{List => JList}
-
-import scala.collection.JavaConversions._
-
-import com.fasterxml.jackson.core.JsonFactory
-
-import net.razorvine.pickle.Pickler
-
-import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
-import org.apache.spark.annotation.{AlphaComponent, Experimental}
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.api.python.SerDeUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
-import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.types.{BooleanType, StructType}
-import org.apache.spark.storage.StorageLevel
-
-/**
- * :: AlphaComponent ::
- * An RDD of [[Row]] objects that has an associated schema. In addition to standard RDD functions,
- * SchemaRDDs can be used in relational queries, as shown in the examples below.
- *
- * Importing a SQLContext brings an implicit into scope that automatically converts a standard RDD
- * whose elements are scala case classes into a SchemaRDD.  This conversion can also be done
- * explicitly using the `createSchemaRDD` function on a [[SQLContext]].
- *
- * A `SchemaRDD` can also be created by loading data in from external sources.
- * Examples are loading data from Parquet files by using the `parquetFile` method on [[SQLContext]]
- * and loading JSON datasets by using `jsonFile` and `jsonRDD` methods on [[SQLContext]].
- *
- * == SQL Queries ==
- * A SchemaRDD can be registered as a table in the [[SQLContext]] that was used to create it.  Once
- * an RDD has been registered as a table, it can be used in the FROM clause of SQL statements.
- *
- * {{{
- *  // One method for defining the schema of an RDD is to make a case class with the desired column
- *  // names and types.
- *  case class Record(key: Int, value: String)
- *
- *  val sc: SparkContext // An existing spark context.
- *  val sqlContext = new SQLContext(sc)
- *
- *  // Importing the SQL context gives access to all the SQL functions and implicit conversions.
- *  import sqlContext._
- *
- *  val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
- *  // Any RDD containing case classes can be registered as a table.  The schema of the table is
- *  // automatically inferred using scala reflection.
- *  rdd.registerTempTable("records")
- *
- *  val results: SchemaRDD = sql("SELECT * FROM records")
- * }}}
- *
- * == Language Integrated Queries ==
- *
- * {{{
- *
- *  case class Record(key: Int, value: String)
- *
- *  val sc: SparkContext // An existing spark context.
- *  val sqlContext = new SQLContext(sc)
- *
- *  // Importing the SQL context gives access to all the SQL functions and implicit conversions.
- *  import sqlContext._
- *
- *  val rdd = sc.parallelize((1 to 100).map(i => Record(i, "val_" + i)))
- *
- *  // Example of language integrated queries.
- *  rdd.where('key === 1).orderBy('value.asc).select('key).collect()
- * }}}
- *
- *  @groupname Query Language Integrated Queries
- *  @groupdesc Query Functions that create new queries from SchemaRDDs.  The
- *             result of all query functions is also a SchemaRDD, allowing multiple operations to be
- *             chained using a builder pattern.
- *  @groupprio Query -2
- *  @groupname schema SchemaRDD Functions
- *  @groupprio schema -1
- *  @groupname Ungrouped Base RDD Functions
- */
-@AlphaComponent
-class SchemaRDD(
-    @transient val sqlContext: SQLContext,
-    @transient val baseLogicalPlan: LogicalPlan)
-  extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
-
-  def baseSchemaRDD = this
-
-  // =========================================================================================
-  // RDD functions: Copy the internal row representation so we present immutable data to users.
-  // =========================================================================================
-
-  override def compute(split: Partition, context: TaskContext): Iterator[Row] =
-    firstParent[Row].compute(split, context).map(ScalaReflection.convertRowToScala(_, this.schema))
-
-  override def getPartitions: Array[Partition] = firstParent[Row].partitions
-
-  override protected def getDependencies: Seq[Dependency[_]] = {
-    schema // Force reification of the schema so it is available on executors.
-
-    List(new OneToOneDependency(queryExecution.toRdd))
-  }
-
-  /**
-   * Returns the schema of this SchemaRDD (represented by a [[StructType]]).
-   *
-   * @group schema
-   */
-  lazy val schema: StructType = queryExecution.analyzed.schema
-
-  /**
-   * Returns a new RDD with each row transformed to a JSON string.
-   *
-   * @group schema
-   */
-  def toJSON: RDD[String] = {
-    val rowSchema = this.schema
-    this.mapPartitions { iter =>
-      val jsonFactory = new JsonFactory()
-      iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
-    }
-  }
-
-
-  // =======================================================================
-  // Query DSL
-  // =======================================================================
-
-  /**
-   * Changes the output of this relation to the given expressions, similar to the `SELECT` clause
-   * in SQL.
-   *
-   * {{{
-   *   schemaRDD.select('a, 'b + 'c, 'd as 'aliasedName)
-   * }}}
-   *
-   * @param exprs a set of logical expression that will be evaluated for each input row.
-   *
-   * @group Query
-   */
-  def select(exprs: Expression*): SchemaRDD = {
-    val aliases = exprs.zipWithIndex.map {
-      case (ne: NamedExpression, _) => ne
-      case (e, i) => Alias(e, s"c$i")()
-    }
-    new SchemaRDD(sqlContext, Project(aliases, logicalPlan))
-  }
-
-  /**
-   * Filters the output, only returning those rows where `condition` evaluates to true.
-   *
-   * {{{
-   *   schemaRDD.where('a === 'b)
-   *   schemaRDD.where('a === 1)
-   *   schemaRDD.where('a + 'b > 10)
-   * }}}
-   *
-   * @group Query
-   */
-  def where(condition: Expression): SchemaRDD =
-    new SchemaRDD(sqlContext, Filter(condition, logicalPlan))
-
-  /**
-   * Performs a relational join on two SchemaRDDs
-   *
-   * @param otherPlan the [[SchemaRDD]] that should be joined with this one.
-   * @param joinType One of `Inner`, `LeftOuter`, `RightOuter`, or `FullOuter`. Defaults to `Inner.`
-   * @param on       An optional condition for the join operation.  This is equivalent to the `ON`
-   *                 clause in standard SQL.  In the case of `Inner` joins, specifying a
-   *                 `condition` is equivalent to adding `where` clauses after the `join`.
-   *
-   * @group Query
-   */
-  def join(
-      otherPlan: SchemaRDD,
-      joinType: JoinType = Inner,
-      on: Option[Expression] = None): SchemaRDD =
-    new SchemaRDD(sqlContext, Join(logicalPlan, otherPlan.logicalPlan, joinType, on))
-
-  /**
-   * Sorts the results by the given expressions.
-   * {{{
-   *   schemaRDD.orderBy('a)
-   *   schemaRDD.orderBy('a, 'b)
-   *   schemaRDD.orderBy('a.asc, 'b.desc)
-   * }}}
-   *
-   * @group Query
-   */
-  def orderBy(sortExprs: SortOrder*): SchemaRDD =
-    new SchemaRDD(sqlContext, Sort(sortExprs, true, logicalPlan))
-
-  /**
-   * Sorts the results by the given expressions within partition.
-   * {{{
-   *   schemaRDD.sortBy('a)
-   *   schemaRDD.sortBy('a, 'b)
-   *   schemaRDD.sortBy('a.asc, 'b.desc)
-   * }}}
-   *
-   * @group Query
-   */
-  def sortBy(sortExprs: SortOrder*): SchemaRDD =
-    new SchemaRDD(sqlContext, Sort(sortExprs, false, logicalPlan))
-
-  @deprecated("use limit with integer argument", "1.1.0")
-  def limit(limitExpr: Expression): SchemaRDD =
-    new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan))
-
-  /**
-   * Limits the results by the given integer.
-   * {{{
-   *   schemaRDD.limit(10)
-   * }}}
-   * @group Query
-   */
-  def limit(limitNum: Int): SchemaRDD =
-    new SchemaRDD(sqlContext, Limit(Literal(limitNum), logicalPlan))
-
-  /**
-   * Performs a grouping followed by an aggregation.
-   *
-   * {{{
-   *   schemaRDD.groupBy('year)(Sum('sales) as 'totalSales)
-   * }}}
-   *
-   * @group Query
-   */
-  def groupBy(groupingExprs: Expression*)(aggregateExprs: Expression*): SchemaRDD = {
-    val aliasedExprs = aggregateExprs.map {
-      case ne: NamedExpression => ne
-      case e => Alias(e, e.toString)()
-    }
-    new SchemaRDD(sqlContext, Aggregate(groupingExprs, aliasedExprs, logicalPlan))
-  }
-
-  /**
-   * Performs an aggregation over all Rows in this RDD.
-   * This is equivalent to a groupBy with no grouping expressions.
-   *
-   * {{{
-   *   schemaRDD.aggregate(Sum('sales) as 'totalSales)
-   * }}}
-   *
-   * @group Query
-   */
-  def aggregate(aggregateExprs: Expression*): SchemaRDD = {
-    groupBy()(aggregateExprs: _*)
-  }
-
-  /**
-   * Applies a qualifier to the attributes of this relation.  Can be used to disambiguate attributes
-   * with the same name, for example, when performing self-joins.
-   *
-   * {{{
-   *   val x = schemaRDD.where('a === 1).as('x)
-   *   val y = schemaRDD.where('a === 2).as('y)
-   *   x.join(y).where("x.a".attr === "y.a".attr),
-   * }}}
-   *
-   * @group Query
-   */
-  def as(alias: Symbol) =
-    new SchemaRDD(sqlContext, Subquery(alias.name, logicalPlan))
-
-  /**
-   * Combines the tuples of two RDDs with the same schema, keeping duplicates.
-   *
-   * @group Query
-   */
-  def unionAll(otherPlan: SchemaRDD) =
-    new SchemaRDD(sqlContext, Union(logicalPlan, otherPlan.logicalPlan))
-
-  /**
-   * Performs a relational except on two SchemaRDDs
-   *
-   * @param otherPlan the [[SchemaRDD]] that should be excepted from this one.
-   *
-   * @group Query
-   */
-  def except(otherPlan: SchemaRDD): SchemaRDD =
-    new SchemaRDD(sqlContext, Except(logicalPlan, otherPlan.logicalPlan))
-
-  /**
-   * Performs a relational intersect on two SchemaRDDs
-   *
-   * @param otherPlan the [[SchemaRDD]] that should be intersected with this one.
-   *
-   * @group Query
-   */
-  def intersect(otherPlan: SchemaRDD): SchemaRDD =
-    new SchemaRDD(sqlContext, Intersect(logicalPlan, otherPlan.logicalPlan))
-
-  /**
-   * Filters tuples using a function over the value of the specified column.
-   *
-   * {{{
-   *   schemaRDD.where('a)((a: Int) => ...)
-   * }}}
-   *
-   * @group Query
-   */
-  def where[T1](arg1: Symbol)(udf: (T1) => Boolean) =
-    new SchemaRDD(
-      sqlContext,
-      Filter(ScalaUdf(udf, BooleanType, Seq(UnresolvedAttribute(arg1.name))), logicalPlan))
-
-  /**
-   * :: Experimental ::
-   * Returns a sampled version of the underlying dataset.
-   *
-   * @group Query
-   */
-  @Experimental
-  override
-  def sample(
-      withReplacement: Boolean = true,
-      fraction: Double,
-      seed: Long) =
-    new SchemaRDD(sqlContext, Sample(fraction, withReplacement, seed, logicalPlan))
-
-  /**
-   * :: Experimental ::
-   * Return the number of elements in the RDD. Unlike the base RDD implementation of count, this
-   * implementation leverages the query optimizer to compute the count on the SchemaRDD, which
-   * supports features such as filter pushdown.
-   * 
-   * @group Query
-   */
-  @Experimental
-  override def count(): Long = aggregate(Count(Literal(1))).collect().head.getLong(0)
-
-  /**
-   * :: Experimental ::
-   * Applies the given Generator, or table generating function, to this relation.
-   *
-   * @param generator A table generating function.  The API for such functions is likely to change
-   *                  in future releases
-   * @param join when set to true, each output row of the generator is joined with the input row
-   *             that produced it.
-   * @param outer when set to true, at least one row will be produced for each input row, similar to
-   *              an `OUTER JOIN` in SQL.  When no output rows are produced by the generator for a
-   *              given row, a single row will be output, with `NULL` values for each of the
-   *              generated columns.
-   * @param alias an optional alias that can be used as qualifier for the attributes that are
-   *              produced by this generate operation.
-   *
-   * @group Query
-   */
-  @Experimental
-  def generate(
-      generator: Generator,
-      join: Boolean = false,
-      outer: Boolean = false,
-      alias: Option[String] = None) =
-    new SchemaRDD(sqlContext, Generate(generator, join, outer, alias, logicalPlan))
-
-  /**
-   * Returns this RDD as a SchemaRDD.  Intended primarily to force the invocation of the implicit
-   * conversion from a standard RDD to a SchemaRDD.
-   *
-   * @group schema
-   */
-  def toSchemaRDD = this
-
-  /**
-   * Converts a JavaRDD to a PythonRDD. It is used by pyspark.
-   */
-  private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
-    val fieldTypes = schema.fields.map(_.dataType)
-    val jrdd = this.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
-    SerDeUtil.javaToPython(jrdd)
-  }
-
-  /**
-   * Serializes the Array[Row] returned by SchemaRDD's optimized collect(), using the same
-   * format as javaToPython. It is used by pyspark.
-   */
-  private[sql] def collectToPython: JList[Array[Byte]] = {
-    val fieldTypes = schema.fields.map(_.dataType)
-    val pickle = new Pickler
-    new java.util.ArrayList(collect().map { row =>
-      EvaluatePython.rowToArray(row, fieldTypes)
-    }.grouped(100).map(batched => pickle.dumps(batched.toArray)).toIterable)
-  }
-
-  /**
-   * Serializes the Array[Row] returned by SchemaRDD's takeSample(), using the same
-   * format as javaToPython and collectToPython. It is used by pyspark.
-   */
-  private[sql] def takeSampleToPython(
-      withReplacement: Boolean,
-      num: Int,
-      seed: Long): JList[Array[Byte]] = {
-    val fieldTypes = schema.fields.map(_.dataType)
-    val pickle = new Pickler
-    new java.util.ArrayList(this.takeSample(withReplacement, num, seed).map { row =>
-      EvaluatePython.rowToArray(row, fieldTypes)
-    }.grouped(100).map(batched => pickle.dumps(batched.toArray)).toIterable)
-  }
-
-  /**
-   * Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value
-   * of base RDD functions that do not change schema.
-   *
-   * @param rdd RDD derived from this one and has same schema
-   *
-   * @group schema
-   */
-  private def applySchema(rdd: RDD[Row]): SchemaRDD = {
-    new SchemaRDD(sqlContext,
-      LogicalRDD(queryExecution.analyzed.output.map(_.newInstance()), rdd)(sqlContext))
-  }
-
-  // =======================================================================
-  // Overridden RDD actions
-  // =======================================================================
-
-  override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect()
-
-  def collectAsList(): java.util.List[Row] = java.util.Arrays.asList(collect() : _*)
-
-  override def take(num: Int): Array[Row] = limit(num).collect()
-
-  // =======================================================================
-  // Base RDD functions that do NOT change schema
-  // =======================================================================
-
-  // Transformations (return a new RDD)
-
-  override def coalesce(numPartitions: Int, shuffle: Boolean = false)
-                       (implicit ord: Ordering[Row] = null): SchemaRDD =
-    applySchema(super.coalesce(numPartitions, shuffle)(ord))
-
-  override def distinct(): SchemaRDD = applySchema(super.distinct())
-
-  override def distinct(numPartitions: Int)
-                       (implicit ord: Ordering[Row] = null): SchemaRDD =
-    applySchema(super.distinct(numPartitions)(ord))
-
-  def distinct(numPartitions: Int): SchemaRDD =
-    applySchema(super.distinct(numPartitions)(null))
-
-  override def filter(f: Row => Boolean): SchemaRDD =
-    applySchema(super.filter(f))
-
-  override def intersection(other: RDD[Row]): SchemaRDD =
-    applySchema(super.intersection(other))
-
-  override def intersection(other: RDD[Row], partitioner: Partitioner)
-                           (implicit ord: Ordering[Row] = null): SchemaRDD =
-    applySchema(super.intersection(other, partitioner)(ord))
-
-  override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD =
-    applySchema(super.intersection(other, numPartitions))
-
-  override def repartition(numPartitions: Int)
-                          (implicit ord: Ordering[Row] = null): SchemaRDD =
-    applySchema(super.repartition(numPartitions)(ord))
-
-  override def subtract(other: RDD[Row]): SchemaRDD =
-    applySchema(super.subtract(other))
-
-  override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD =
-    applySchema(super.subtract(other, numPartitions))
-
-  override def subtract(other: RDD[Row], p: Partitioner)
-                       (implicit ord: Ordering[Row] = null): SchemaRDD =
-    applySchema(super.subtract(other, p)(ord))
-
-  /** Overridden cache function will always use the in-memory columnar caching. */
-  override def cache(): this.type = {
-    sqlContext.cacheQuery(this)
-    this
-  }
-
-  override def persist(newLevel: StorageLevel): this.type = {
-    sqlContext.cacheQuery(this, None, newLevel)
-    this
-  }
-
-  override def unpersist(blocking: Boolean): this.type = {
-    sqlContext.tryUncacheQuery(this, blocking)
-    this
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
deleted file mode 100644
index 3cf9209..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql
-
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.LogicalRDD
-
-/**
- * Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
- */
-private[sql] trait SchemaRDDLike {
-  @transient def sqlContext: SQLContext
-  @transient val baseLogicalPlan: LogicalPlan
-
-  private[sql] def baseSchemaRDD: SchemaRDD
-
-  /**
-   * :: DeveloperApi ::
-   * A lazily computed query execution workflow.  All other RDD operations are passed
-   * through to the RDD that is produced by this workflow. This workflow is produced lazily because
-   * invoking the whole query optimization pipeline can be expensive.
-   *
-   * The query execution is considered a Developer API as phases may be added or removed in future
-   * releases.  This execution is only exposed to provide an interface for inspecting the various
-   * phases for debugging purposes.  Applications should not depend on particular phases existing
-   * or producing any specific output, even for exactly the same query.
-   *
-   * Additionally, the RDD exposed by this execution is not designed for consumption by end users.
-   * In particular, it does not contain any schema information, and it reuses Row objects
-   * internally.  This object reuse improves performance, but can make programming against the RDD
-   * more difficult.  Instead end users should perform RDD operations on a SchemaRDD directly.
-   */
-  @transient
-  @DeveloperApi
-  lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
-
-  @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
-    // For various commands (like DDL) and queries with side effects, we force query optimization to
-    // happen right away to let these side effects take place eagerly.
-    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
-      LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
-    case _ =>
-      baseLogicalPlan
-  }
-
-  override def toString =
-    s"""${super.toString}
-       |== Query Plan ==
-       |${queryExecution.simpleString}""".stripMargin.trim
-
-  /**
-   * Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema.  Files that
-   * are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
-   * function.
-   *
-   * @group schema
-   */
-  def saveAsParquetFile(path: String): Unit = {
-    sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
-  }
-
-  /**
-   * Registers this RDD as a temporary table using the given name.  The lifetime of this temporary
-   * table is tied to the [[SQLContext]] that was used to create this SchemaRDD.
-   *
-   * @group schema
-   */
-  def registerTempTable(tableName: String): Unit = {
-    sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
-  }
-
-  @deprecated("Use registerTempTable instead of registerAsTable.", "1.1")
-  def registerAsTable(tableName: String): Unit = registerTempTable(tableName)
-
-  /**
-   * :: Experimental ::
-   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
-   *
-   * @group schema
-   */
-  @Experimental
-  def insertInto(tableName: String, overwrite: Boolean): Unit =
-    sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
-      Map.empty, logicalPlan, overwrite)).toRdd
-
-  /**
-   * :: Experimental ::
-   * Appends the rows from this RDD to the specified table.
-   *
-   * @group schema
-   */
-  @Experimental
-  def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
-
-  /**
-   * :: Experimental ::
-   * Creates a table from the the contents of this SchemaRDD.  This will fail if the table already
-   * exists.
-   *
-   * Note that this currently only works with SchemaRDDs that are created from a HiveContext as
-   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
-   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
-   * be the target of an `insertInto`.
-   *
-   * @group schema
-   */
-  @Experimental
-  def saveAsTable(tableName: String): Unit =
-    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd
-
-  /** Returns the schema as a string in the tree format.
-   *
-   * @group schema
-   */
-  def schemaString: String = baseSchemaRDD.schema.treeString
-
-  /** Prints out the schema.
-   *
-   * @group schema
-   */
-  def printSchema(): Unit = println(schemaString)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/119f45d6/sql/core/src/main/scala/org/apache/spark/sql/api.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
new file mode 100644
index 0000000..073d41e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
@@ -0,0 +1,289 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.storage.StorageLevel
+
+
+/**
+ * An internal interface defining the RDD-like methods for [[DataFrame]].
+ * Please use [[DataFrame]] directly, and do NOT use this.
+ */
+trait RDDApi[T] {
+
+  def cache(): this.type = persist()
+
+  def persist(): this.type
+
+  def persist(newLevel: StorageLevel): this.type
+
+  def unpersist(): this.type = unpersist(blocking = false)
+
+  def unpersist(blocking: Boolean): this.type
+
+  def map[R: ClassTag](f: T => R): RDD[R]
+
+  def mapPartitions[R: ClassTag](f: Iterator[T] => Iterator[R]): RDD[R]
+
+  def take(n: Int): Array[T]
+
+  def collect(): Array[T]
+
+  def collectAsList(): java.util.List[T]
+
+  def count(): Long
+
+  def first(): T
+
+  def repartition(numPartitions: Int): DataFrame
+}
+
+
+/**
+ * An internal interface defining data frame related methods in [[DataFrame]].
+ * Please use [[DataFrame]] directly, and do NOT use this.
+ */
+trait DataFrameSpecificApi {
+
+  def schema: StructType
+
+  def printSchema(): Unit
+
+  def dtypes: Array[(String, String)]
+
+  def columns: Array[String]
+
+  def head(): Row
+
+  def head(n: Int): Array[Row]
+
+  /////////////////////////////////////////////////////////////////////////////
+  // Relational operators
+  /////////////////////////////////////////////////////////////////////////////
+  def apply(colName: String): Column
+
+  def apply(projection: Product): DataFrame
+
+  @scala.annotation.varargs
+  def select(cols: Column*): DataFrame
+
+  @scala.annotation.varargs
+  def select(col: String, cols: String*): DataFrame
+
+  def apply(condition: Column): DataFrame
+
+  def as(name: String): DataFrame
+
+  def filter(condition: Column): DataFrame
+
+  def where(condition: Column): DataFrame
+
+  @scala.annotation.varargs
+  def groupBy(cols: Column*): GroupedDataFrame
+
+  @scala.annotation.varargs
+  def groupBy(col1: String, cols: String*): GroupedDataFrame
+
+  def agg(exprs: Map[String, String]): DataFrame
+
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame
+
+  def sort(colName: String): DataFrame
+
+  @scala.annotation.varargs
+  def orderBy(sortExpr: Column, sortExprs: Column*): DataFrame
+
+  @scala.annotation.varargs
+  def sort(sortExpr: Column, sortExprs: Column*): DataFrame
+
+  def join(right: DataFrame): DataFrame
+
+  def join(right: DataFrame, joinExprs: Column): DataFrame
+
+  def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
+
+  def limit(n: Int): DataFrame
+
+  def unionAll(other: DataFrame): DataFrame
+
+  def intersect(other: DataFrame): DataFrame
+
+  def except(other: DataFrame): DataFrame
+
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
+
+  def sample(withReplacement: Boolean, fraction: Double): DataFrame
+
+  /////////////////////////////////////////////////////////////////////////////
+  // Column mutation
+  /////////////////////////////////////////////////////////////////////////////
+  def addColumn(colName: String, col: Column): DataFrame
+
+  /////////////////////////////////////////////////////////////////////////////
+  // I/O and interaction with other frameworks
+  /////////////////////////////////////////////////////////////////////////////
+
+  def rdd: RDD[Row]
+
+  def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD()
+
+  def toJSON: RDD[String]
+
+  def registerTempTable(tableName: String): Unit
+
+  def saveAsParquetFile(path: String): Unit
+
+  @Experimental
+  def saveAsTable(tableName: String): Unit
+
+  @Experimental
+  def insertInto(tableName: String, overwrite: Boolean): Unit
+
+  @Experimental
+  def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
+
+  /////////////////////////////////////////////////////////////////////////////
+  // Stat functions
+  /////////////////////////////////////////////////////////////////////////////
+//  def describe(): Unit
+//
+//  def mean(): Unit
+//
+//  def max(): Unit
+//
+//  def min(): Unit
+}
+
+
+/**
+ * An internal interface defining expression APIs for [[DataFrame]].
+ * Please use [[DataFrame]] and [[Column]] directly, and do NOT use this.
+ */
+trait ExpressionApi {
+
+  def isComputable: Boolean
+
+  def unary_- : Column
+  def unary_! : Column
+  def unary_~ : Column
+
+  def + (other: Column): Column
+  def + (other: Any): Column
+  def - (other: Column): Column
+  def - (other: Any): Column
+  def * (other: Column): Column
+  def * (other: Any): Column
+  def / (other: Column): Column
+  def / (other: Any): Column
+  def % (other: Column): Column
+  def % (other: Any): Column
+  def & (other: Column): Column
+  def & (other: Any): Column
+  def | (other: Column): Column
+  def | (other: Any): Column
+  def ^ (other: Column): Column
+  def ^ (other: Any): Column
+
+  def && (other: Column): Column
+  def && (other: Boolean): Column
+  def || (other: Column): Column
+  def || (other: Boolean): Column
+
+  def < (other: Column): Column
+  def < (other: Any): Column
+  def <= (other: Column): Column
+  def <= (other: Any): Column
+  def > (other: Column): Column
+  def > (other: Any): Column
+  def >= (other: Column): Column
+  def >= (other: Any): Column
+  def === (other: Column): Column
+  def === (other: Any): Column
+  def equalTo(other: Column): Column
+  def equalTo(other: Any): Column
+  def <=> (other: Column): Column
+  def <=> (other: Any): Column
+  def !== (other: Column): Column
+  def !== (other: Any): Column
+
+  @scala.annotation.varargs
+  def in(list: Column*): Column
+
+  def like(other: Column): Column
+  def like(other: String): Column
+  def rlike(other: Column): Column
+  def rlike(other: String): Column
+
+  def contains(other: Column): Column
+  def contains(other: Any): Column
+  def startsWith(other: Column): Column
+  def startsWith(other: String): Column
+  def endsWith(other: Column): Column
+  def endsWith(other: String): Column
+
+  def substr(startPos: Column, len: Column): Column
+  def substr(startPos: Int, len: Int): Column
+
+  def isNull: Column
+  def isNotNull: Column
+
+  def getItem(ordinal: Column): Column
+  def getItem(ordinal: Int): Column
+  def getField(fieldName: String): Column
+
+  def cast(to: DataType): Column
+
+  def asc: Column
+  def desc: Column
+
+  def as(alias: String): Column
+}
+
+
+/**
+ * An internal interface defining aggregation APIs for [[DataFrame]].
+ * Please use [[DataFrame]] and [[GroupedDataFrame]] directly, and do NOT use this.
+ */
+trait GroupedDataFrameApi {
+
+  def agg(exprs: Map[String, String]): DataFrame
+
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame
+
+  def avg(): DataFrame
+
+  def mean(): DataFrame
+
+  def min(): DataFrame
+
+  def max(): DataFrame
+
+  def sum(): DataFrame
+
+  def count(): DataFrame
+
+  // TODO: Add var, std
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org