diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
new file mode 100644
index 0000000..1ea7db0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -0,0 +1,2124 @@
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* See the License for the specific language governing permissions and
+* limitations under the License.
+package org.apache.spark.sql
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+import scala.reflect.runtime.universe.TypeTag
+import com.fasterxml.jackson.core.JsonFactory
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.api.python.PythonRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.encoders._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.optimizer.CombineUnions
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.usePrettyExpression
+import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution}
+import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
+import org.apache.spark.sql.execution.python.EvaluatePython
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+private[sql] object Dataset {
+  def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = {
+    new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
+  }
+  def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
+    val qe = sqlContext.executePlan(logicalPlan)
+    qe.assertAnalyzed()
+    new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
+  }
+ * :: Experimental ::
+ * A distributed collection of data organized into named columns.
+ *
+ * A [[DataFrame]] is equivalent to a relational table in Spark SQL. The following example creates
+ * a [[DataFrame]] by pointing Spark SQL to a Parquet data set.
+ * {{{
+ *   val people ="...")  // in Scala
+ *   DataFrame people ="...")  // in Java
+ * }}}
+ *
+ * Once created, it can be manipulated using the various domain-specific-language (DSL) functions
+ * defined in: [[DataFrame]] (this class), [[Column]], and [[functions]].
+ *
+ * To select a column from the data frame, use `apply` method in Scala and `col` in Java.
+ * {{{
+ *   val ageCol = people("age")  // in Scala
+ *   Column ageCol = people.col("age")  // in Java
+ * }}}
+ *
+ * Note that the [[Column]] type can also be manipulated through its various functions.
+ * {{{
+ *   // The following creates a new column that increases everybody's age by 10.
+ *   people("age") + 10  // in Scala
+ *   people.col("age").plus(10);  // in Java
+ * }}}
+ *
+ * A more concrete example in Scala:
+ * {{{
+ *   // To create DataFrame using SQLContext
+ *   val people ="...")
+ *   val department ="...")
+ *
+ *   people.filter("age > 30")
+ *     .join(department, people("deptId") === department("id"))
+ *     .groupBy(department("name"), "gender")
+ *     .agg(avg(people("salary")), max(people("age")))
+ * }}}
+ *
+ * and in Java:
+ * {{{
+ *   // To create DataFrame using SQLContext
+ *   DataFrame people ="...");
+ *   DataFrame department ="...");
+ *
+ *   people.filter("age".gt(30))
+ *     .join(department, people.col("deptId").equalTo(department("id")))
+ *     .groupBy(department.col("name"), "gender")
+ *     .agg(avg(people.col("salary")), max(people.col("age")));
+ * }}}
+ *
+ * @groupname basic Basic DataFrame functions
+ * @groupname dfops Language Integrated Queries
+ * @groupname rdd RDD Operations
+ * @groupname output Output Operations
+ * @groupname action Actions
+ * @since 1.3.0
+ */
+class Dataset[T] private[sql](
+    @transient override val sqlContext: SQLContext,
+    @DeveloperApi @transient override val queryExecution: QueryExecution,
+    encoder: Encoder[T])
+  extends Queryable with Serializable {
+  queryExecution.assertAnalyzed()
+  // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
+  // you wrap it with `withNewExecutionId` if this actions doesn't call other action.
+  def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
+    this(sqlContext, sqlContext.executePlan(logicalPlan), encoder)
+  }
+  @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match {
+    // For various commands (like DDL) and queries with side effects, we force query optimization to
+    // happen right away to let these side effects take place eagerly.
+    case _: Command |
+         _: InsertIntoTable |
+         _: CreateTableUsingAsSelect =>
+      LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
+    case _ =>
+      queryExecution.analyzed
+  }
+  /**
+   * An unresolved version of the internal encoder for the type of this [[Dataset]].  This one is
+   * marked implicit so that we can use it when constructing new [[Dataset]] objects that have the
+   * same object type (that will be possibly resolved to a different schema).
+   */
+  private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(encoder)
+  unresolvedTEncoder.validate(logicalPlan.output)
+  /** The encoder for this [[Dataset]] that has been resolved to its output schema. */
+  private[sql] val resolvedTEncoder: ExpressionEncoder[T] =
+    unresolvedTEncoder.resolve(logicalPlan.output, OuterScopes.outerScopes)
+  /**
+   * The encoder where the expressions used to construct an object from an input row have been
+   * bound to the ordinals of this [[Dataset]]'s output schema.
+   */
+  private[sql] val boundTEncoder = resolvedTEncoder.bind(logicalPlan.output)
+  private implicit def classTag = unresolvedTEncoder.clsTag
+  protected[sql] def resolve(colName: String): NamedExpression = {
+    queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
+      throw new AnalysisException(
+        s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
+    }
+  }
+  protected[sql] def numericColumns: Seq[Expression] = {
+    schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
+      queryExecution.analyzed.resolveQuoted(, sqlContext.analyzer.resolver).get
+    }
+  }
+  /**
+   * Compose the string representing rows for output
+   * @param _numRows Number of rows to show
+   * @param truncate Whether truncate long strings and align cells right
+   */
+  override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
+    val numRows = _numRows.max(0)
+    val takeResult = take(numRows + 1)
+    val hasMoreData = takeResult.length > numRows
+    val data = takeResult.take(numRows)
+    // For array values, replace Seq and Array with square brackets
+    // For cells that are beyond 20 characters, replace it with the first 17 and "..."
+    val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: {
+      case r: Row => r
+      case tuple: Product => Row.fromTuple(tuple)
+      case o => Row(o)
+    }.map { row =>
+ { cell =>
+        val str = cell match {
+          case null => "null"
+          case binary: Array[Byte] =>"%02X".format(_)).mkString("[", " ", "]")
+          case array: Array[_] => array.mkString("[", ", ", "]")
+          case seq: Seq[_] => seq.mkString("[", ", ", "]")
+          case _ => cell.toString
+        }
+        if (truncate && str.length > 20) str.substring(0, 17) + "..." else str
+      }: Seq[String]
+    }
+    formatString ( rows, numRows, hasMoreData, truncate )
+  }
+  /**
+   * Returns the object itself.
+   * @group basic
+   * @since 1.3.0
+   */
+  // This is declared with parentheses to prevent the Scala compiler from treating
+  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
+  def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema))
+  /**
+   * :: Experimental ::
+   * Converts this [[DataFrame]] to a strongly-typed [[Dataset]] containing objects of the
+   * specified type, `U`.
+   * @group basic
+   * @since 1.6.0
+   */
+  @Experimental
+  def as[U : Encoder]: Dataset[U] = Dataset[U](sqlContext, logicalPlan)
+  /**
+   * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion
+   * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
+   * {{{
+   *   val rdd: RDD[(Int, String)] = ...
+   *   rdd.toDF()  // this implicit conversion creates a DataFrame with column name _1 and _2
+   *   rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name"
+   * }}}
+   * @group basic
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def toDF(colNames: String*): DataFrame = {
+    require(schema.size == colNames.size,
+      "The number of columns doesn't match.\n" +
+        s"Old column names (${schema.size}): " +", ") + "\n" +
+        s"New column names (${colNames.size}): " + colNames.mkString(", "))
+    val newCols = { case (oldAttribute, newName) =>
+      Column(oldAttribute).as(newName)
+    }
+    select(newCols : _*)
+  }
+  /**
+   * Returns the schema of this [[DataFrame]].
+   * @group basic
+   * @since 1.3.0
+   */
+  def schema: StructType = queryExecution.analyzed.schema
+  /**
+   * Prints the schema to the console in a nice tree format.
+   * @group basic
+   * @since 1.3.0
+   */
+  // scalastyle:off println
+  override def printSchema(): Unit = println(schema.treeString)
+  // scalastyle:on println
+  /**
+   * Prints the plans (logical and physical) to the console for debugging purposes.
+   * @group basic
+   * @since 1.3.0
+   */
+  override def explain(extended: Boolean): Unit = {
+    val explain = ExplainCommand(queryExecution.logical, extended = extended)
+    sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
+      // scalastyle:off println
+      r => println(r.getString(0))
+      // scalastyle:on println
+    }
+  }
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   * @since 1.3.0
+   */
+  override def explain(): Unit = explain(extended = false)
+  /**
+   * Returns all column names and their data types as an array.
+   * @group basic
+   * @since 1.3.0
+   */
+  def dtypes: Array[(String, String)] = { field =>
+    (, field.dataType.toString)
+  }
+  /**
+   * Returns all column names as an array.
+   * @group basic
+   * @since 1.3.0
+   */
+  def columns: Array[String] =
+  /**
+   * Returns true if the `collect` and `take` methods can be run locally
+   * (without any Spark executors).
+   * @group basic
+   * @since 1.3.0
+   */
+  def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
+  /**
+   * Displays the [[DataFrame]] in a tabular form. Strings more than 20 characters will be
+   * truncated, and all cells will be aligned right. For example:
+   * {{{
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
+   * }}}
+   * @param numRows Number of rows to show
+   *
+   * @group action
+   * @since 1.3.0
+   */
+  def show(numRows: Int): Unit = show(numRows, truncate = true)
+  /**
+   * Displays the top 20 rows of [[DataFrame]] in a tabular form. Strings more than 20 characters
+   * will be truncated, and all cells will be aligned right.
+   * @group action
+   * @since 1.3.0
+   */
+  def show(): Unit = show(20)
+  /**
+   * Displays the top 20 rows of [[DataFrame]] in a tabular form.
+   *
+   * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
+   *              be truncated and all cells will be aligned right
+   *
+   * @group action
+   * @since 1.5.0
+   */
+  def show(truncate: Boolean): Unit = show(20, truncate)
+  /**
+   * Displays the [[DataFrame]] in a tabular form. For example:
+   * {{{
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
+   * }}}
+   * @param numRows Number of rows to show
+   * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
+   *              be truncated and all cells will be aligned right
+   *
+   * @group action
+   * @since 1.5.0
+   */
+  // scalastyle:off println
+  def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
+  // scalastyle:on println
+  /**
+   * Returns a [[DataFrameNaFunctions]] for working with missing data.
+   * {{{
+   *   // Dropping rows containing any null values.
+   *
+   * }}}
+   *
+   * @group dfops
+   * @since 1.3.1
+   */
+  def na: DataFrameNaFunctions = new DataFrameNaFunctions(toDF())
+  /**
+   * Returns a [[DataFrameStatFunctions]] for working statistic functions support.
+   * {{{
+   *   // Finding frequent items in column with name 'a'.
+   *   df.stat.freqItems(Seq("a"))
+   * }}}
+   *
+   * @group dfops
+   * @since 1.4.0
+   */
+  def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
+  /**
+   * Cartesian join with another [[DataFrame]].
+   *
+   * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
+   *
+   * @param right Right side of the join operation.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def join(right: DataFrame): DataFrame = withPlan {
+    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
+  }
+  /**
+   * Inner equi-join with another [[DataFrame]] using the given column.
+   *
+   * Different from other join functions, the join column will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * {{{
+   *   // Joining df1 and df2 using the column "user_id"
+   *   df1.join(df2, "user_id")
+   * }}}
+   *
+   * Note that if you perform a self-join using this function without aliasing the input
+   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
+   * there is no way to disambiguate which side of the join you would like to reference.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumn Name of the column to join on. This column must exist on both sides.
+   * @group dfops
+   * @since 1.4.0
+   */
+  def join(right: DataFrame, usingColumn: String): DataFrame = {
+    join(right, Seq(usingColumn))
+  }
+  /**
+   * Inner equi-join with another [[DataFrame]] using the given columns.
+   *
+   * Different from other join functions, the join columns will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * {{{
+   *   // Joining df1 and df2 using the columns "user_id" and "user_name"
+   *   df1.join(df2, Seq("user_id", "user_name"))
+   * }}}
+   *
+   * Note that if you perform a self-join using this function without aliasing the input
+   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
+   * there is no way to disambiguate which side of the join you would like to reference.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
+   * @group dfops
+   * @since 1.4.0
+   */
+  def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
+    join(right, usingColumns, "inner")
+  }
+  /**
+   * Equi-join with another [[DataFrame]] using the given columns.
+   *
+   * Different from other join functions, the join columns will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * Note that if you perform a self-join using this function without aliasing the input
+   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
+   * there is no way to disambiguate which side of the join you would like to reference.
+   *
+   * @param right Right side of the join operation.
+   * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
+   * @group dfops
+   * @since 1.6.0
+   */
+  def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
+    // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
+    // by creating a new instance for one of the branch.
+    val joined = sqlContext.executePlan(
+      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
+      .analyzed.asInstanceOf[Join]
+    val condition = { col =>
+      catalyst.expressions.EqualTo(
+        withPlan(joined.left).resolve(col),
+        withPlan(joined.right).resolve(col))
+    }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
+      catalyst.expressions.And(cond, eqTo)
+    }
+    // Project only one of the join columns.
+    val joinedCols = JoinType(joinType) match {
+      case Inner | LeftOuter | LeftSemi =>
+ => withPlan(joined.left).resolve(col))
+      case RightOuter =>
+ => withPlan(joined.right).resolve(col))
+      case FullOuter =>
+ { col =>
+          val leftCol = withPlan(joined.left).resolve(col).toAttribute.withNullability(true)
+          val rightCol = withPlan(joined.right).resolve(col).toAttribute.withNullability(true)
+          Alias(Coalesce(Seq(leftCol, rightCol)), col)()
+        }
+      case NaturalJoin(_) => sys.error("NaturalJoin with using clause is not supported.")
+    }
+    // The nullability of output of joined could be different than original column,
+    // so we can only compare them by exprId
+    val joinRefs = AttributeSet(condition.toSeq.flatMap(_.references))
+    val resultCols = joinedCols ++ joined.output.filterNot(joinRefs.contains(_))
+    withPlan {
+      Project(
+        resultCols,
+        Join(
+          joined.left,
+          joined.right,
+          joinType = JoinType(joinType),
+          condition)
+      )
+    }
+  }
+  /**
+   * Inner join with another [[DataFrame]], using the given join expression.
+   *
+   * {{{
+   *   // The following two are equivalent:
+   *   df1.join(df2, $"df1Key" === $"df2Key")
+   *   df1.join(df2).where($"df1Key" === $"df2Key")
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")
+  /**
+   * Join with another [[DataFrame]], using the given join expression. The following performs
+   * a full outer join between `df1` and `df2`.
+   *
+   * {{{
+   *   // Scala:
+   *   import org.apache.spark.sql.functions._
+   *   df1.join(df2, $"df1Key" === $"df2Key", "outer")
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
+   * }}}
+   *
+   * @param right Right side of the join.
+   * @param joinExprs Join expression.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
+    // Note that in this function, we introduce a hack in the case of self-join to automatically
+    // resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
+    // Consider this case: df.join(df, df("key") === df("key"))
+    // Since df("key") === df("key") is a trivially true condition, this actually becomes a
+    // cartesian join. However, most likely users expect to perform a self join using "key".
+    // With that assumption, this hack turns the trivially true condition into equality on join
+    // keys that are resolved to both sides.
+    // Trigger analysis so in the case of self-join, the analyzer will clone the plan.
+    // After the cloning, left and right side will have distinct expression ids.
+    val plan = withPlan(
+      Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
+      .queryExecution.analyzed.asInstanceOf[Join]
+    // If auto self join alias is disabled, return the plan.
+    if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
+      return withPlan(plan)
+    }
+    // If left/right have no output set intersection, return the plan.
+    val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
+    val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
+    if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
+      return withPlan(plan)
+    }
+    // Otherwise, find the trivially true predicates and automatically resolves them to both sides.
+    // By the time we get here, since we have already run analysis, all attributes should've been
+    // resolved and become AttributeReference.
+    val cond = { _.transform {
+      case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
+          if a.sameRef(b) =>
+        catalyst.expressions.EqualTo(
+          withPlan(plan.left).resolve(,
+          withPlan(plan.right).resolve(
+    }}
+    withPlan {
+      plan.copy(condition = cond)
+    }
+  }
+  /**
+   * Joins this [[Dataset]] returning a [[Tuple2]] for each pair where `condition` evaluates to
+   * true.
+   *
+   * This is similar to the relation `join` function with one important difference in the
+   * result schema. Since `joinWith` preserves objects present on either side of the join, the
+   * result schema is similarly nested into a tuple under the column names `_1` and `_2`.
+   *
+   * This type of join can be useful both for preserving type-safety with the original object
+   * types as well as working with relational data where either side of the join has column
+   * names in common.
+   *
+   * @param other Right side of the join.
+   * @param condition Join expression.
+   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
+   * @since 1.6.0
+   */
+  def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = {
+    val left = this.logicalPlan
+    val right = other.logicalPlan
+    val joined = sqlContext.executePlan(Join(left, right, joinType =
+      JoinType(joinType), Some(condition.expr)))
+    val leftOutput = joined.analyzed.output.take(left.output.length)
+    val rightOutput = joined.analyzed.output.takeRight(right.output.length)
+    val leftData = this.unresolvedTEncoder match {
+      case e if e.flat => Alias(leftOutput.head, "_1")()
+      case _ => Alias(CreateStruct(leftOutput), "_1")()
+    }
+    val rightData = other.unresolvedTEncoder match {
+      case e if e.flat => Alias(rightOutput.head, "_2")()
+      case _ => Alias(CreateStruct(rightOutput), "_2")()
+    }
+    implicit val tuple2Encoder: Encoder[(T, U)] =
+      ExpressionEncoder.tuple(this.unresolvedTEncoder, other.unresolvedTEncoder)
+    withTypedPlan[(T, U)](other, encoderFor[(T, U)]) { (left, right) =>
+      Project(
+        leftData :: rightData :: Nil,
+        joined.analyzed)
+    }
+  }
+  /**
+   * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair
+   * where `condition` evaluates to true.
+   *
+   * @param other Right side of the join.
+   * @param condition Join expression.
+   * @since 1.6.0
+   */
+  def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
+    joinWith(other, condition, "inner")
+  }
+  /**
+   * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
+   *
+   * This is the same operation as "SORT BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = {
+    sortWithinPartitions((sortCol +: sortCols).map(Column(_)) : _*)
+  }
+  /**
+   * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
+   *
+   * This is the same operation as "SORT BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def sortWithinPartitions(sortExprs: Column*): Dataset[T] = {
+    sortInternal(global = false, sortExprs)
+  }
+  /**
+   * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
+   * {{{
+   *   // The following 3 are equivalent
+   *   df.sort("sortcol")
+   *   df.sort($"sortcol")
+   *   df.sort($"sortcol".asc)
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def sort(sortCol: String, sortCols: String*): Dataset[T] = {
+    sort((sortCol +: sortCols).map(apply) : _*)
+  }
+  /**
+   * Returns a new [[DataFrame]] sorted by the given expressions. For example:
+   * {{{
+   *   df.sort($"col1", $"col2".desc)
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def sort(sortExprs: Column*): Dataset[T] = {
+    sortInternal(global = true, sortExprs)
+  }
+  /**
+   * Returns a new [[DataFrame]] sorted by the given expressions.
+   * This is an alias of the `sort` function.
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols : _*)
+  /**
+   * Returns a new [[DataFrame]] sorted by the given expressions.
+   * This is an alias of the `sort` function.
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)
+  /**
+   * Selects column based on the column name and return it as a [[Column]].
+   * Note that the column name can also reference to a nested column like `a.b`.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def apply(colName: String): Column = col(colName)
+  /**
+   * Selects column based on the column name and return it as a [[Column]].
+   * Note that the column name can also reference to a nested column like `a.b`.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def col(colName: String): Column = colName match {
+    case "*" =>
+      Column(ResolvedStar(queryExecution.analyzed.output))
+    case _ =>
+      val expr = resolve(colName)
+      Column(expr)
+  }
+  /**
+   * Returns a new [[DataFrame]] with an alias set.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def as(alias: String): Dataset[T] = withTypedPlan {
+    SubqueryAlias(alias, logicalPlan)
+  }
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def as(alias: Symbol): Dataset[T] = as(
+  /**
+   * Returns a new [[DataFrame]] with an alias set. Same as `as`.
+   * @group dfops
+   * @since 1.6.0
+   */
+  def alias(alias: String): Dataset[T] = as(alias)
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] with an alias set. Same as `as`.
+   * @group dfops
+   * @since 1.6.0
+   */
+  def alias(alias: Symbol): Dataset[T] = as(alias)
+  /**
+   * Selects a set of column based expressions.
+   * {{{
+   *$"colA", $"colB" + 1)
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def select(cols: Column*): DataFrame = withPlan {
+    Project(, logicalPlan)
+  }
+  /**
+   * Selects a set of columns. This is a variant of `select` that can only select
+   * existing columns using column names (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // The following two are equivalent:
+   *"colA", "colB")
+   *$"colA", $"colB")
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
+  /**
+   * Selects a set of SQL expressions. This is a variant of `select` that accepts
+   * SQL expressions.
+   *
+   * {{{
+   *   // The following are equivalent:
+   *   df.selectExpr("colA", "colB as newName", "abs(colC)")
+   *"colA"), expr("colB as newName"), expr("abs(colC)"))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def selectExpr(exprs: String*): DataFrame = {
+    select( { expr =>
+      Column(sqlContext.sqlParser.parseExpression(expr))
+    }: _*)
+  }
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expression for each element.
+   *
+   * {{{
+   *   val ds = Seq(1, 2, 3).toDS()
+   *   val newDS ="value + 1").as[Int])
+   * }}}
+   * @since 1.6.0
+   */
+  def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = {
+    new Dataset[U1](
+      sqlContext,
+      Project(
+        c1.withInputType(
+          boundTEncoder,
+          logicalPlan.output).named :: Nil,
+        logicalPlan),
+      implicitly[Encoder[U1]])
+  }
+  /**
+   * Internal helper function for building typed selects that return tuples.  For simplicity and
+   * code reuse, we do this without the help of the type system and then use helper functions
+   * that cast appropriately for the user facing interface.
+   */
+  protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
+    val encoders =
+    val namedColumns =
+, logicalPlan.output).named)
+    val execution = new QueryExecution(sqlContext, Project(namedColumns, logicalPlan))
+    new Dataset(sqlContext, execution, ExpressionEncoder.tuple(encoders))
+  }
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] =
+    selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2, U3](
+      c1: TypedColumn[T, U1],
+      c2: TypedColumn[T, U2],
+      c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] =
+    selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]]
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2, U3, U4](
+      c1: TypedColumn[T, U1],
+      c2: TypedColumn[T, U2],
+      c3: TypedColumn[T, U3],
+      c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] =
+    selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]]
+  /**
+   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
+   * @since 1.6.0
+   */
+  def select[U1, U2, U3, U4, U5](
+      c1: TypedColumn[T, U1],
+      c2: TypedColumn[T, U2],
+      c3: TypedColumn[T, U3],
+      c4: TypedColumn[T, U4],
+      c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
+    selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]]
+  /**
+   * Filters rows using the given condition.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDf.filter($"age" > 15)
+   *   peopleDf.where($"age" > 15)
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def filter(condition: Column): Dataset[T] = withTypedPlan {
+    Filter(condition.expr, logicalPlan)
+  }
+  /**
+   * Filters rows using the given SQL expression.
+   * {{{
+   *   peopleDf.filter("age > 15")
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def filter(conditionExpr: String): Dataset[T] = {
+    filter(Column(sqlContext.sqlParser.parseExpression(conditionExpr)))
+  }
+  /**
+   * Filters rows using the given condition. This is an alias for `filter`.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDf.filter($"age" > 15)
+   *   peopleDf.where($"age" > 15)
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def where(condition: Column): Dataset[T] = filter(condition)
+  /**
+   * Filters rows using the given SQL expression.
+   * {{{
+   *   peopleDf.where("age > 15")
+   * }}}
+   * @group dfops
+   * @since 1.5.0
+   */
+  def where(conditionExpr: String): Dataset[T] = {
+    filter(Column(sqlContext.sqlParser.parseExpression(conditionExpr)))
+  }
+  /**
+   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+   * See [[GroupedData]] for all the available aggregate functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   df.groupBy($"department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and gender.
+   *   df.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def groupBy(cols: Column*): GroupedData = {
+    GroupedData(toDF(),, GroupedData.GroupByType)
+  }
+  /**
+   * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
+   * so we can run aggregation on them.
+   * See [[GroupedData]] for all the available aggregate functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns rolluped by department and group.
+   *   df.rollup($"department", $"group").avg()
+   *
+   *   // Compute the max age and average salary, rolluped by department and gender.
+   *   df.rollup($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   * @group dfops
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def rollup(cols: Column*): GroupedData = {
+    GroupedData(toDF(),, GroupedData.RollupType)
+  }
+  /**
+   * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
+   * so we can run aggregation on them.
+   * See [[GroupedData]] for all the available aggregate functions.
+   *
+   * {{{
+   *   // Compute the average for all numeric columns cubed by department and group.
+   *   df.cube($"department", $"group").avg()
+   *
+   *   // Compute the max age and average salary, cubed by department and gender.
+   *   df.cube($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   * @group dfops
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def cube(cols: Column*): GroupedData = GroupedData(toDF(),, GroupedData.CubeType)
+  /**
+   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
+   * See [[GroupedData]] for all the available aggregate functions.
+   *
+   * This is a variant of groupBy that can only group by existing columns using column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns grouped by department.
+   *   df.groupBy("department").avg()
+   *
+   *   // Compute the max age and average salary, grouped by department and gender.
+   *   df.groupBy($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def groupBy(col1: String, cols: String*): GroupedData = {
+    val colNames: Seq[String] = col1 +: cols
+    GroupedData(toDF(), => resolve(colName)), GroupedData.GroupByType)
+  }
+  /**
+   * (Scala-specific)
+   * Reduces the elements of this [[Dataset]] using the specified binary function. The given `func`
+   * must be commutative and associative or the result may be non-deterministic.
+   * @since 1.6.0
+   */
+  def reduce(func: (T, T) => T): T = rdd.reduce(func)
+  /**
+   * (Java-specific)
+   * Reduces the elements of this Dataset using the specified binary function.  The given `func`
+   * must be commutative and associative or the result may be non-deterministic.
+   * @since 1.6.0
+   */
+  def reduce(func: ReduceFunction[T]): T = reduce(, _))
+  /**
+   * (Scala-specific)
+   * Returns a [[GroupedDataset]] where the data is grouped by the given key `func`.
+   * @since 1.6.0
+   */
+  def groupByKey[K: Encoder](func: T => K): GroupedDataset[K, T] = {
+    val inputPlan = logicalPlan
+    val withGroupingKey = AppendColumns(func, inputPlan)
+    val executed = sqlContext.executePlan(withGroupingKey)
+    new GroupedDataset(
+      encoderFor[K],
+      encoderFor[T],
+      executed,
+      inputPlan.output,
+      withGroupingKey.newColumns)
+  }
+  /**
+   * Returns a [[GroupedDataset]] where the data is grouped by the given [[Column]] expressions.
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def groupByKey(cols: Column*): GroupedDataset[Row, T] = {
+    val withKeyColumns = logicalPlan.output ++
+    val withKey = Project(withKeyColumns, logicalPlan)
+    val executed = sqlContext.executePlan(withKey)
+    val dataAttributes = executed.analyzed.output.dropRight(cols.size)
+    val keyAttributes = executed.analyzed.output.takeRight(cols.size)
+    new GroupedDataset(
+      RowEncoder(keyAttributes.toStructType),
+      encoderFor[T],
+      executed,
+      dataAttributes,
+      keyAttributes)
+  }
+  /**
+   * (Java-specific)
+   * Returns a [[GroupedDataset]] where the data is grouped by the given key `func`.
+   * @since 1.6.0
+   */
+  def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): GroupedDataset[K, T] =
+    groupByKey(
+  /**
+   * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
+   * so we can run aggregation on them.
+   * See [[GroupedData]] for all the available aggregate functions.
+   *
+   * This is a variant of rollup that can only group by existing columns using column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns rolluped by department and group.
+   *   df.rollup("department", "group").avg()
+   *
+   *   // Compute the max age and average salary, rolluped by department and gender.
+   *   df.rollup($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   * @group dfops
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def rollup(col1: String, cols: String*): GroupedData = {
+    val colNames: Seq[String] = col1 +: cols
+    GroupedData(toDF(), => resolve(colName)), GroupedData.RollupType)
+  }
+  /**
+   * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
+   * so we can run aggregation on them.
+   * See [[GroupedData]] for all the available aggregate functions.
+   *
+   * This is a variant of cube that can only group by existing columns using column names
+   * (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // Compute the average for all numeric columns cubed by department and group.
+   *   df.cube("department", "group").avg()
+   *
+   *   // Compute the max age and average salary, cubed by department and gender.
+   *   df.cube($"department", $"gender").agg(Map(
+   *     "salary" -> "avg",
+   *     "age" -> "max"
+   *   ))
+   * }}}
+   * @group dfops
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def cube(col1: String, cols: String*): GroupedData = {
+    val colNames: Seq[String] = col1 +: cols
+    GroupedData(toDF(), => resolve(colName)), GroupedData.CubeType)
+  }
+  /**
+   * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups.
+   * {{{
+   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
+   *   df.agg("age" -> "max", "salary" -> "avg")
+   *   df.groupBy().agg("age" -> "max", "salary" -> "avg")
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
+    groupBy().agg(aggExpr, aggExprs : _*)
+  }
+  /**
+   * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups.
+   * {{{
+   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
+   *   df.agg(Map("age" -> "max", "salary" -> "avg"))
+   *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
+  /**
+   * (Java-specific) Aggregates on the entire [[DataFrame]] without groups.
+   * {{{
+   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
+   *   df.agg(Map("age" -> "max", "salary" -> "avg"))
+   *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)
+  /**
+   * Aggregates on the entire [[DataFrame]] without groups.
+   * {{{
+   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
+   *   df.agg(max($"age"), avg($"salary"))
+   *   df.groupBy().agg(max($"age"), avg($"salary"))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)
+  /**
+   * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
+   * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
+   * @group dfops
+   * @since 1.3.0
+   */
+  def limit(n: Int): Dataset[T] = withTypedPlan {
+    Limit(Literal(n), logicalPlan)
+  }
+  /**
+   * Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
+   * This is equivalent to `UNION ALL` in SQL.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def unionAll(other: Dataset[T]): Dataset[T] = withTypedPlan {
+    // This breaks caching, but it's usually ok because it addresses a very specific use case:
+    // using union to union many files or partitions.
+    CombineUnions(Union(logicalPlan, other.logicalPlan))
+  }
+  def union(other: Dataset[T]): Dataset[T] = unionAll(other)
+  /**
+   * Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
+   * This is equivalent to `INTERSECT` in SQL.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan {
+    Intersect(logicalPlan, other.logicalPlan)
+  }
+  /**
+   * Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
+   * This is equivalent to `EXCEPT` in SQL.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
+    Except(logicalPlan, other.logicalPlan)
+  }
+  def subtract(other: Dataset[T]): Dataset[T] = except(other)
+  /**
+   * Returns a new [[DataFrame]] by sampling a fraction of rows.
+   *
+   * @param withReplacement Sample with replacement or not.
+   * @param fraction Fraction of rows to generate.
+   * @param seed Seed for sampling.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = withTypedPlan {
+    Sample(0.0, fraction, withReplacement, seed, logicalPlan)()
+  }
+  /**
+   * Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
+   *
+   * @param withReplacement Sample with replacement or not.
+   * @param fraction Fraction of rows to generate.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
+    sample(withReplacement, fraction, Utils.random.nextLong)
+  }
+  /**
+   * Randomly splits this [[DataFrame]] with the provided weights.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1.
+   * @param seed Seed for sampling.
+   * @group dfops
+   * @since 1.4.0
+   */
+  def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = {
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic.
+    val sorted = Sort(, Ascending)), global = false, logicalPlan)
+    val sum = weights.sum
+    val normalizedCumWeights = / sum).scanLeft(0.0d)(_ + _)
+    normalizedCumWeights.sliding(2).map { x =>
+      new Dataset[T](
+        sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)(), encoder)
+    }.toArray
+  }
+  /**
+   * Randomly splits this [[DataFrame]] with the provided weights.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1.
+   * @group dfops
+   * @since 1.4.0
+   */
+  def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
+    randomSplit(weights, Utils.random.nextLong)
+  }
+  /**
+   * Randomly splits this [[DataFrame]] with the provided weights. Provided for the Python Api.
+   *
+   * @param weights weights for splits, will be normalized if they don't sum to 1.
+   * @param seed Seed for sampling.
+   * @group dfops
+   */
+  private[spark] def randomSplit(weights: List[Double], seed: Long): Array[Dataset[T]] = {
+    randomSplit(weights.toArray, seed)
+  }
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] where each row has been expanded to zero or more
+   * rows by the provided function.  This is similar to a `LATERAL VIEW` in HiveQL. The columns of
+   * the input row are implicitly joined with each row that is output by the function.
+   *
+   * The following example uses this function to count the number of books which contain
+   * a given word:
+   *
+   * {{{
+   *   case class Book(title: String, words: String)
+   *   val df: RDD[Book]
+   *
+   *   case class Word(word: String)
+   *   val allWords = df.explode('words) {
+   *     case Row(words: String) => words.split(" ").map(Word(_))
+   *   }
+   *
+   *   val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
+    val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
+    val elementTypes = {
+      attr => (attr.dataType, attr.nullable, }
+    val names =
+    val convert = CatalystTypeConverters.createToCatalystConverter(schema)
+    val rowFunction =
+      f.andThen([InternalRow]))
+    val generator = UserDefinedGenerator(elementTypes, rowFunction,
+    withPlan {
+      Generate(generator, join = true, outer = false,
+        qualifier = None, generatorOutput = Nil, logicalPlan)
+    }
+  }
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] where a single column has been expanded to zero
+   * or more rows by the provided function.  This is similar to a `LATERAL VIEW` in HiveQL. All
+   * columns of the input row are implicitly joined with each value that is output by the function.
+   *
+   * {{{
+   *   df.explode("words", "word") {words: String => words.split(" ")}
+   * }}}
+   * @group dfops
+   * @since 1.3.0
+   */
+  def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])
+    : DataFrame = {
+    val dataType = ScalaReflection.schemaFor[B].dataType
+    val attributes = AttributeReference(outputColumn, dataType)() :: Nil
+    // TODO handle the metadata?
+    val elementTypes = { attr => (attr.dataType, attr.nullable, }
+    def rowFunction(row: Row): TraversableOnce[InternalRow] = {
+      val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
+      f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
+    }
+    val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)
+    withPlan {
+      Generate(generator, join = true, outer = false,
+        qualifier = None, generatorOutput = Nil, logicalPlan)
+    }
+  }
+  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * Returns a new [[DataFrame]] by adding a column or replacing the existing column that has
+   * the same name.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def withColumn(colName: String, col: Column): DataFrame = {
+    val resolver = sqlContext.analyzer.resolver
+    val output = queryExecution.analyzed.output
+    val shouldReplace = output.exists(f => resolver(, colName))
+    if (shouldReplace) {
+      val columns = { field =>
+        if (resolver(, colName)) {
+        } else {
+          Column(field)
+        }
+      }
+      select(columns : _*)
+    } else {
+      select(Column("*"),
+    }
+  }
+  /**
+   * Returns a new [[DataFrame]] by adding a column with metadata.
+   */
+  private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
+    val resolver = sqlContext.analyzer.resolver
+    val output = queryExecution.analyzed.output
+    val shouldReplace = output.exists(f => resolver(, colName))
+    if (shouldReplace) {
+      val columns = { field =>
+        if (resolver(, colName)) {
+, metadata)
+        } else {
+          Column(field)
+        }
+      }
+      select(columns : _*)
+    } else {
+      select(Column("*"),, metadata))
+    }
+  }
+  /**
+   * Returns a new [[DataFrame]] with a column renamed.
+   * This is a no-op if schema doesn't contain existingName.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def withColumnRenamed(existingName: String, newName: String): DataFrame = {
+    val resolver = sqlContext.analyzer.resolver
+    val output = queryExecution.analyzed.output
+    val shouldRename = output.exists(f => resolver(, existingName))
+    if (shouldRename) {
+      val columns = { col =>
+        if (resolver(, existingName)) {
+          Column(col).as(newName)
+        } else {
+          Column(col)
+        }
+      }
+      select(columns : _*)
+    } else {
+      toDF()
+    }
+  }
+  /**
+   * Returns a new [[DataFrame]] with a column dropped.
+   * This is a no-op if schema doesn't contain column name.
+   * @group dfops
+   * @since 1.4.0
+   */
+  def drop(colName: String): DataFrame = {
+    drop(Seq(colName) : _*)
+  }
+  /**
+   * Returns a new [[DataFrame]] with columns dropped.
+   * This is a no-op if schema doesn't contain column name(s).
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def drop(colNames: String*): DataFrame = {
+    val resolver = sqlContext.analyzer.resolver
+    val remainingCols =
+      schema.filter(f => colNames.forall(n => !resolver(, n))).map(f => Column(
+    if (remainingCols.size == this.schema.size) {
+      toDF()
+    } else {
+ _*)
+    }
+  }
+  /**
+   * Returns a new [[DataFrame]] with a column dropped.
+   * This version of drop accepts a Column rather than a name.
+   * This is a no-op if the DataFrame doesn't have a column
+   * with an equivalent expression.
+   * @group dfops
+   * @since 1.4.1
+   */
+  def drop(col: Column): DataFrame = {
+    val expression = col match {
+      case Column(u: UnresolvedAttribute) =>
+        queryExecution.analyzed.resolveQuoted(, sqlContext.analyzer.resolver).getOrElse(u)
+      case Column(expr: Expression) => expr
+    }
+    val attrs = this.logicalPlan.output
+    val colsAfterDrop = attrs.filter { attr =>
+      attr != expression
+    }.map(attr => Column(attr))
+    select(colsAfterDrop : _*)
+  }
+  /**
+   * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+   * This is an alias for `distinct`.
+   * @group dfops
+   * @since 1.4.0
+   */
+  def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
+  /**
+   * (Scala-specific) Returns a new [[DataFrame]] with duplicate rows removed, considering only
+   * the subset of columns.
+   *
+   * @group dfops
+   * @since 1.4.0
+   */
+  def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
+    val groupCols =
+    val groupColExprIds =
+    val aggCols = { attr =>
+      if (groupColExprIds.contains(attr.exprId)) {
+        attr
+      } else {
+        Alias(new First(attr).toAggregateExpression(),
+      }
+    }
+    Aggregate(groupCols, aggCols, logicalPlan)
+  }
+  /**
+   * Returns a new [[DataFrame]] with duplicate rows removed, considering only
+   * the subset of columns.
+   *
+   * @group dfops
+   * @since 1.4.0
+   */
+  def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
+  /**
+   * Computes statistics for numeric columns, including count, mean, stddev, min, and max.
+   * If no columns are given, this function computes statistics for all numerical columns.
+   *
+   * This function is meant for exploratory data analysis, as we make no guarantee about the
+   * backward compatibility of the schema of the resulting [[DataFrame]]. If you want to
+   * programmatically compute summary statistics, use the `agg` function instead.
+   *
+   * {{{
+   *   df.describe("age", "height").show()
+   *
+   *   // output:
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean    53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min     18.0  163.0
+   *   // max     92.0  192.0
+   * }}}
+   *
+   * @group action
+   * @since 1.3.1
+   */
+  @scala.annotation.varargs
+  def describe(cols: String*): DataFrame = withPlan {
+    // The list of summary statistics to compute, in the form of expressions.
+    val statistics = List[(String, Expression => Expression)](
+      "count" -> ((child: Expression) => Count(child).toAggregateExpression()),
+      "mean" -> ((child: Expression) => Average(child).toAggregateExpression()),
+      "stddev" -> ((child: Expression) => StddevSamp(child).toAggregateExpression()),
+      "min" -> ((child: Expression) => Min(child).toAggregateExpression()),
+      "max" -> ((child: Expression) => Max(child).toAggregateExpression()))
+    val outputCols =
+      (if (cols.isEmpty) else cols).toList
+    val ret: Seq[Row] = if (outputCols.nonEmpty) {
+      val aggExprs = statistics.flatMap { case (_, colToAgg) =>
+ => Column(Cast(colToAgg(Column(c).expr), StringType)).as(c))
+      }
+      val row = agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
+      // Pivot the data so each summary is one row
+      row.grouped(outputCols.size) { case (aggregation, (statistic, _)) =>
+        Row(statistic :: aggregation.toList: _*)
+      }
+    } else {
+      // If there are no output columns, just output a single column that contains the stats.
+ { case (name, _) => Row(name) }
+    }
+    // All columns are string type
+    val schema = StructType(
+      StructField("summary", StringType) ::, StringType))).toAttributes
+    LocalRelation.fromExternalRows(schema, ret)
+  }
+  /**
+   * Returns the first `n` rows.
+   *
+   * @note this method should only be used if the resulting array is expected to be small, as
+   * all the data is loaded into the driver's memory.
+   *
+   * @group action
+   * @since 1.3.0
+   */
+  def head(n: Int): Array[T] = withTypedCallback("head", limit(n)) { df =>
+    df.collect(needCallback = false)
+  }
+  /**
+   * Returns the first row.
+   * @group action
+   * @since 1.3.0
+   */
+  def head(): T = head(1).head
+  /**
+   * Returns the first row. Alias for head().
+   * @group action
+   * @since 1.3.0
+   */
+  def first(): T = head()
+  /**
+   * Concise syntax for chaining custom transformations.
+   * {{{
+   *   def featurize(ds: DataFrame) = ...
+   *
+   *   df
+   *     .transform(featurize)
+   *     .transform(...)
+   * }}}
+   * @since 1.6.0
+   */
+  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
+  /**
+   * (Scala-specific)
+   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
+   * @since 1.6.0
+   */
+  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
+  /**
+   * (Java-specific)
+   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
+   * @since 1.6.0
+   */
+  def filter(func: FilterFunction[T]): Dataset[T] = filter(t =>
+  /**
+   * (Scala-specific)
+   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
+   * @since 1.6.0
+   */
+  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(
+  /**
+   * (Java-specific)
+   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
+   * @since 1.6.0
+   */
+  def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] =
+    map(t =>
+  /**
+   * (Scala-specific)
+   * Returns a new [[Dataset]] that contains the result of applying `func` to each partition.
+   * @since 1.6.0
+   */
+  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
+    new Dataset[U](
+      sqlContext,
+      MapPartitions[T, U](func, logicalPlan),
+      implicitly[Encoder[U]])
+  }
+  /**
+   * (Java-specific)
+   * Returns a new [[Dataset]] that contains the result of applying `func` to each partition.
+   * @since 1.6.0
+   */
+  def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    val func: (Iterator[T]) => Iterator[U] = x =>
+    mapPartitions(func)(encoder)
+  }
+  /**
+   * (Scala-specific)
+   * Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]],
+   * and then flattening the results.
+   * @since 1.6.0
+   */
+  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
+    mapPartitions(_.flatMap(func))
+  /**
+   * (Java-specific)
+   * Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]],
+   * and then flattening the results.
+   * @since 1.6.0
+   */
+  def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
+    val func: (T) => Iterator[U] = x =>
+    flatMap(func)(encoder)
+  }
+  /**
+   * Applies a function `f` to all rows.
+   * @group rdd
+   * @since 1.3.0
+   */
+  def foreach(f: T => Unit): Unit = withNewExecutionId {
+    rdd.foreach(f)
+  }
+  /**
+   * (Java-specific)
+   * Runs `func` on each element of this [[Dataset]].
+   * @since 1.6.0
+   */
+  def foreach(func: ForeachFunction[T]): Unit = foreach(
+  /**
+   * Applies a function f to each partition of this [[DataFrame]].
+   * @group rdd
+   * @since 1.3.0
+   */
+  def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId {
+    rdd.foreachPartition(f)
+  }
+  /**
+   * (Java-specific)
+   * Runs `func` on each partition of this [[Dataset]].
+   * @since 1.6.0
+   */
+  def foreachPartition(func: ForeachPartitionFunction[T]): Unit =
+    foreachPartition(it =>
+  /**
+   * Returns the first `n` rows in the [[DataFrame]].
+   *
+   * Running take requires moving data into the application's driver process, and doing so with
+   * a very large `n` can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 1.3.0
+   */
+  def take(n: Int): Array[T] = head(n)
+  /**
+   * Returns the first `n` rows in the [[DataFrame]] as a list.
+   *
+   * Running take requires moving data into the application's driver process, and doing so with
+   * a very large `n` can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 1.6.0
+   */
+  def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n) : _*)
+  /**
+   * Returns an array that contains all of [[Row]]s in this [[DataFrame]].
+   *
+   * Running collect requires moving all the data into the application's driver process, and
+   * doing so on a very large dataset can crash the driver process with OutOfMemoryError.
+   *
+   * For Java API, use [[collectAsList]].
+   *
+   * @group action
+   * @since 1.3.0
+   */
+  def collect(): Array[T] = collect(needCallback = true)
+  /**
+   * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
+   *
+   * Running collect requires moving all the data into the application's driver process, and
+   * doing so on a very large dataset can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 1.3.0
+   */
+  def collectAsList(): java.util.List[T] = withCallback("collectAsList", toDF()) { _ =>
+    withNewExecutionId {
+      val values =
+      java.util.Arrays.asList(values : _*)
+    }
+  }
+  private def collect(needCallback: Boolean): Array[T] = {
+    def execute(): Array[T] = withNewExecutionId {
+    }
+    if (needCallback) {
+      withCallback("collect", toDF())(_ => execute())
+    } else {
+      execute()
+    }
+  }
+  /**
+   * Returns the number of rows in the [[DataFrame]].
+   * @group action
+   * @since 1.3.0
+   */
+  def count(): Long = withCallback("count", groupBy().count()) { df =>
+    df.collect(needCallback = false).head.getLong(0)
+  }
+  /**
+   * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
+    Repartition(numPartitions, shuffle = true, logicalPlan)
+  }
+  /**
+   * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
+   * `numPartitions`. The resulting DataFrame is hash partitioned.
+   *
+   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
+    RepartitionByExpression(, logicalPlan, Some(numPartitions))
+  }
+  /**
+   * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
+   * the existing number of partitions. The resulting DataFrame is hash partitioned.
+   *
+   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
+    RepartitionByExpression(, logicalPlan, numPartitions = None)
+  }
+  /**
+   * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
+   * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
+   * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
+   * the 100 new partitions will claim 10 of the current partitions.
+   * @group rdd
+   * @since 1.4.0
+   */
+  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
+    Repartition(numPartitions, shuffle = false, logicalPlan)
+  }
+  /**
+   * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
+   * This is an alias for `dropDuplicates`.
+   * @group dfops
+   * @since 1.3.0
+   */
+  def distinct(): Dataset[T] = dropDuplicates()
+  /**
+   * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`).
+   * @group basic
+   * @since 1.3.0
+   */
+  def persist(): this.type = {
+    sqlContext.cacheManager.cacheQuery(this)
+    this
+  }
+  /**
+   * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`).
+   * @group basic
+   * @since 1.3.0
+   */
+  def cache(): this.type = persist()
+  /**
+   * Persist this [[DataFrame]] with the given storage level.
+   * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
+   *                 `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
+   *                 `MEMORY_AND_DISK_2`, etc.
+   * @group basic
+   * @since 1.3.0
+   */
+  def persist(newLevel: StorageLevel): this.type = {
+    sqlContext.cacheManager.cacheQuery(this, None, newLevel)
+    this
+  }
+  /**
+   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk.
+   * @param blocking Whether to block until all blocks are deleted.
+   * @group basic
+   * @since 1.3.0
+   */
+  def unpersist(blocking: Boolean): this.type = {
+    sqlContext.cacheManager.tryUncacheQuery(this, blocking)
+    this
+  }
+  /**
+   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk.
+   * @group basic
+   * @since 1.3.0
+   */
+  def unpersist(): this.type = unpersist(blocking = false)
+  /////////////////////////////////////////////////////////////////////////////
+  // I/O
+  /////////////////////////////////////////////////////////////////////////////
+  /**
+   * Represents the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. Note that the RDD is
+   * memoized. Once called, it won't change even if you change any query planning related Spark SQL
+   * configurations (e.g. `spark.sql.shuffle.partitions`).
+   * @group rdd
+   * @since 1.3.0
+   */
+  lazy val rdd: RDD[T] = {
+    // use a local variable to make sure the map closure doesn't capture the whole DataFrame
+    val schema = this.schema
+    queryExecution.toRdd.mapPartitions { rows =>
+    }
+  }
+  /**
+   * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+   * @group rdd
+   * @since 1.3.0
+   */
+  def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD()
+  /**
+   * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
+   * @group rdd
+   * @since 1.3.0
+   */
+  def javaRDD: JavaRDD[T] = toJavaRDD
+  /**
+   * Registers this [[DataFrame]] as a temporary table using the given name.  The lifetime of this
+   * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.
+   *
+   * @group basic
+   * @since 1.3.0
+   */
+  def registerTempTable(tableName: String): Unit = {
+    sqlContext.registerDataFrameAsTable(toDF(), tableName)
+  }
+  /**
+   * :: Experimental ::
+   * Interface for saving the content of the [[DataFrame]] out into external storage or streams.
+   *
+   * @group output
+   * @since 1.4.0
+   */
+  @Experimental
+  def write: DataFrameWriter = new DataFrameWriter(toDF())
+  /**
+   * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
+   * @group rdd
+   * @since 1.3.0
+   */
+  def toJSON: RDD[String] = {
+    val rowSchema = this.schema
+    queryExecution.toRdd.mapPartitions { iter =>
+      val writer = new CharArrayWriter()
+      // create the Generator without separator inserted between 2 records
+      val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+      new Iterator[String] {
+        override def hasNext: Boolean = iter.hasNext
+        override def next(): String = {
+          JacksonGenerator(rowSchema, gen)(
+          gen.flush()
+          val json = writer.toString
+          if (hasNext) {
+            writer.reset()
+          } else {
+            gen.close()
+          }
+          json
+        }
+      }
+    }
+  }
+  /**
+   * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply
+   * asks each constituent BaseRelation for its respective files and takes the union of all results.
+   * Depending on the source relations, this may not find all input files. Duplicates are removed.
+   */
+  def inputFiles: Array[String] = {
+    val files: Seq[String] = logicalPlan.collect {
+      case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
+        fsBasedRelation.inputFiles
+      case fr: FileRelation =>
+        fr.inputFiles
+    }.flatten
+    files.toSet.toArray
+  }
+  ////////////////////////////////////////////////////////////////////////////
+  // for Python API
+  ////////////////////////////////////////////////////////////////////////////
+  /**
+   * Converts a JavaRDD to a PythonRDD.
+   */
+  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+    val structType = schema  // capture it for closure
+    val rdd =, structType))
+    EvaluatePython.javaToPython(rdd)
+  }
+  protected[sql] def collectToPython(): Int = {
+    withNewExecutionId {
+      PythonRDD.collectAndServe(javaToPython.rdd)
+    }
+  }
+  /**
+   * Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with
+   * an execution.
+   */
+  private[sql] def withNewExecutionId[U](body: => U): U = {
+    SQLExecution.withNewExecutionId(sqlContext, queryExecution)(body)
+  }
+  /**
+   * Wrap a DataFrame action to track the QueryExecution and time cost, then report to the
+   * user-registered callback functions.
+   */
+  private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
+    try {
+      df.queryExecution.executedPlan.foreach { plan =>
+        plan.resetMetrics()
+      }
+      val start = System.nanoTime()
+      val result = action(df)
+      val end = System.nanoTime()
+      sqlContext.listenerManager.onSuccess(name, df.queryExecution, end - start)
+      result
+    } catch {
+      case e: Exception =>
+        sqlContext.listenerManager.onFailure(name, df.queryExecution, e)
+        throw e
+    }
+  }
+  private def withTypedCallback[A, B](name: String, ds: Dataset[A])(action: Dataset[A] => B) = {
+    try {
+      ds.queryExecution.executedPlan.foreach { plan =>
+        plan.resetMetrics()
+      }
+      val start = System.nanoTime()
+      val result = action(ds)
+      val end = System.nanoTime()
+      sqlContext.listenerManager.onSuccess(name, ds.queryExecution, end - start)
+      result
+    } catch {
+      case e: Exception =>
+        sqlContext.listenerManager.onFailure(name, ds.queryExecution, e)
+        throw e
+    }
+  }
+  private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
+    val sortOrder: Seq[SortOrder] = { col =>
+      col.expr match {
+        case expr: SortOrder =>
+          expr
+        case expr: Expression =>
+          SortOrder(expr, Ascending)
+      }
+    }
+    withTypedPlan {
+      Sort(sortOrder, global = global, logicalPlan)
+    }
+  }
+  /** A convenient function to wrap a logical plan and produce a DataFrame. */
+  @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
+    Dataset.newDataFrame(sqlContext, logicalPlan)
+  }
+  /** A convenient function to wrap a logical plan and produce a DataFrame. */
+  @inline private def withTypedPlan(logicalPlan: => LogicalPlan): Dataset[T] = {
+    new Dataset[T](sqlContext, logicalPlan, encoder)
+  }
+  private[sql] def withTypedPlan[R](
+      other: Dataset[_], encoder: Encoder[R])(
+      f: (LogicalPlan, LogicalPlan) => LogicalPlan): Dataset[R] =
+    new Dataset[R](sqlContext, f(logicalPlan, other.logicalPlan), encoder)
@@ -287,7 +268,7 @@ object functions extends LegacyFunctions {
    * @since 1.3.0
   def count(columnName: String): TypedColumn[Any, Long] =
-    count(Column(columnName)).as(ExpressionEncoder[Long])
+    count(Column(columnName)).as(ExpressionEncoder[Long]())
    * Aggregate function: returns the number of distinct items in a group.

Posted by
[SPARK-13880][SPARK-13881][SQL] Rename DataFrame.scala Dataset.scala, and remove LegacyFunctions

## What changes were proposed in this pull request?
1. Rename DataFrame.scala Dataset.scala, since the class is now named Dataset.
2. Remove LegacyFunctions. It was introduced in Spark 1.6 for backward compatibility, and can be removed in Spark 2.0.

## How was this patch tested?
Should be covered by existing unit/integration tests.

Author: Reynold Xin <>

Closes #11704 from rxin/SPARK-13880.


Branch: refs/heads/master
Commit: e76679a814f5a0903c5f93d9a482f5ddc56fe0d2
Parents: b5e3bd8
Author: Reynold Xin <>
Authored: Tue Mar 15 10:39:07 2016 +0800
Committer: Wenchen Fan <>
Committed: Tue Mar 15 10:39:07 2016 +0800

 project/MimaExcludes.scala                      |    1 +
 .../scala/org/apache/spark/sql/DataFrame.scala  | 2124 ------------------
 .../scala/org/apache/spark/sql/Dataset.scala    | 2124 ++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala  |   23 +-
 4 files changed, 2127 insertions(+), 2145 deletions(-)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a9973bc..2a4a874 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -314,6 +314,7 @@ object MimaExcludes {
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.LegacyFunctions"),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
deleted file mode 100644
index 1ea7db0..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ /dev/null
@@ -1,2124 +0,0 @@
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* See the License for the specific language governing permissions and
-* limitations under the License.
-package org.apache.spark.sql
-import scala.collection.JavaConverters._
-import scala.language.implicitConversions
-import scala.reflect.runtime.universe.TypeTag
-import com.fasterxml.jackson.core.JsonFactory
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
-import org.apache.spark.api.python.PythonRDD
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.encoders._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.optimizer.CombineUnions
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.usePrettyExpression
-import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.ExplainCommand
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
-import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
-import org.apache.spark.sql.execution.python.EvaluatePython
-import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
-private[sql] object Dataset {
-  def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = {
-    new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
-  }
-  def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
-    val qe = sqlContext.executePlan(logicalPlan)
-    qe.assertAnalyzed()
-    new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
-  }
- * :: Experimental ::
- * A distributed collection of data organized into named columns.
- *
- * A [[DataFrame]] is equivalent to a relational table in Spark SQL. The following example creates
- * a [[DataFrame]] by pointing Spark SQL to a Parquet data set.
- * {{{
- *   val people ="...")  // in Scala
- *   DataFrame people ="...")  // in Java
- * }}}
- *
- * Once created, it can be manipulated using the various domain-specific-language (DSL) functions
- * defined in: [[DataFrame]] (this class), [[Column]], and [[functions]].
- *
- * To select a column from the data frame, use `apply` method in Scala and `col` in Java.
- * {{{
- *   val ageCol = people("age")  // in Scala
- *   Column ageCol = people.col("age")  // in Java
- * }}}
- *
- * Note that the [[Column]] type can also be manipulated through its various functions.
- * {{{
- *   // The following creates a new column that increases everybody's age by 10.
- *   people("age") + 10  // in Scala
- *   people.col("age").plus(10);  // in Java
- * }}}
- *
- * A more concrete example in Scala:
- * {{{
- *   // To create DataFrame using SQLContext
- *   val people ="...")
- *   val department ="...")
- *
- *   people.filter("age > 30")
- *     .join(department, people("deptId") === department("id"))
- *     .groupBy(department("name"), "gender")
- *     .agg(avg(people("salary")), max(people("age")))
- * }}}
- *
- * and in Java:
- * {{{
- *   // To create DataFrame using SQLContext
- *   DataFrame people ="...");
- *   DataFrame department ="...");
- *
- *   people.filter("age".gt(30))
- *     .join(department, people.col("deptId").equalTo(department("id")))
- *     .groupBy(department.col("name"), "gender")
- *     .agg(avg(people.col("salary")), max(people.col("age")));
- * }}}
- *
- * @groupname basic Basic DataFrame functions
- * @groupname dfops Language Integrated Queries
- * @groupname rdd RDD Operations
- * @groupname output Output Operations
- * @groupname action Actions
- * @since 1.3.0
- */
-class Dataset[T] private[sql](
-    @transient override val sqlContext: SQLContext,
-    @DeveloperApi @transient override val queryExecution: QueryExecution,
-    encoder: Encoder[T])
-  extends Queryable with Serializable {
-  queryExecution.assertAnalyzed()
-  // Note for Spark contributors: if adding or updating any action in `DataFrame`, please make sure
-  // you wrap it with `withNewExecutionId` if this actions doesn't call other action.
-  def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
-    this(sqlContext, sqlContext.executePlan(logicalPlan), encoder)
-  }
-  @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match {
-    // For various commands (like DDL) and queries with side effects, we force query optimization to
-    // happen right away to let these side effects take place eagerly.
-    case _: Command |
-         _: InsertIntoTable |
-         _: CreateTableUsingAsSelect =>
-      LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
-    case _ =>
-      queryExecution.analyzed
-  }
-  /**
-   * An unresolved version of the internal encoder for the type of this [[Dataset]].  This one is
-   * marked implicit so that we can use it when constructing new [[Dataset]] objects that have the
-   * same object type (that will be possibly resolved to a different schema).
-   */
-  private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = encoderFor(encoder)
-  unresolvedTEncoder.validate(logicalPlan.output)
-  /** The encoder for this [[Dataset]] that has been resolved to its output schema. */
-  private[sql] val resolvedTEncoder: ExpressionEncoder[T] =
-    unresolvedTEncoder.resolve(logicalPlan.output, OuterScopes.outerScopes)
-  /**
-   * The encoder where the expressions used to construct an object from an input row have been
-   * bound to the ordinals of this [[Dataset]]'s output schema.
-   */
-  private[sql] val boundTEncoder = resolvedTEncoder.bind(logicalPlan.output)
-  private implicit def classTag = unresolvedTEncoder.clsTag
-  protected[sql] def resolve(colName: String): NamedExpression = {
-    queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
-      throw new AnalysisException(
-        s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
-    }
-  }
-  protected[sql] def numericColumns: Seq[Expression] = {
-    schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
-      queryExecution.analyzed.resolveQuoted(, sqlContext.analyzer.resolver).get
-    }
-  }
-  /**
-   * Compose the string representing rows for output
-   * @param _numRows Number of rows to show
-   * @param truncate Whether truncate long strings and align cells right
-   */
-  override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
-    val numRows = _numRows.max(0)
-    val takeResult = take(numRows + 1)
-    val hasMoreData = takeResult.length > numRows
-    val data = takeResult.take(numRows)
-    // For array values, replace Seq and Array with square brackets
-    // For cells that are beyond 20 characters, replace it with the first 17 and "..."
-    val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: {
-      case r: Row => r
-      case tuple: Product => Row.fromTuple(tuple)
-      case o => Row(o)
-    }.map { row =>
- { cell =>
-        val str = cell match {
-          case null => "null"
-          case binary: Array[Byte] =>"%02X".format(_)).mkString("[", " ", "]")
-          case array: Array[_] => array.mkString("[", ", ", "]")
-          case seq: Seq[_] => seq.mkString("[", ", ", "]")
-          case _ => cell.toString
-        }
-        if (truncate && str.length > 20) str.substring(0, 17) + "..." else str
-      }: Seq[String]
-    }
-    formatString ( rows, numRows, hasMoreData, truncate )
-  }
-  /**
-   * Returns the object itself.
-   * @group basic
-   * @since 1.3.0
-   */
-  // This is declared with parentheses to prevent the Scala compiler from treating
-  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
-  def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema))
-  /**
-   * :: Experimental ::
-   * Converts this [[DataFrame]] to a strongly-typed [[Dataset]] containing objects of the
-   * specified type, `U`.
-   * @group basic
-   * @since 1.6.0
-   */
-  @Experimental
-  def as[U : Encoder]: Dataset[U] = Dataset[U](sqlContext, logicalPlan)
-  /**
-   * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion
-   * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
-   * {{{
-   *   val rdd: RDD[(Int, String)] = ...
-   *   rdd.toDF()  // this implicit conversion creates a DataFrame with column name _1 and _2
-   *   rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name"
-   * }}}
-   * @group basic
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def toDF(colNames: String*): DataFrame = {
-    require(schema.size == colNames.size,
-      "The number of columns doesn't match.\n" +
-        s"Old column names (${schema.size}): " +", ") + "\n" +
-        s"New column names (${colNames.size}): " + colNames.mkString(", "))
-    val newCols = { case (oldAttribute, newName) =>
-      Column(oldAttribute).as(newName)
-    }
-    select(newCols : _*)
-  }
-  /**
-   * Returns the schema of this [[DataFrame]].
-   * @group basic
-   * @since 1.3.0
-   */
-  def schema: StructType = queryExecution.analyzed.schema
-  /**
-   * Prints the schema to the console in a nice tree format.
-   * @group basic
-   * @since 1.3.0
-   */
-  // scalastyle:off println
-  override def printSchema(): Unit = println(schema.treeString)
-  // scalastyle:on println
-  /**
-   * Prints the plans (logical and physical) to the console for debugging purposes.
-   * @group basic
-   * @since 1.3.0
-   */
-  override def explain(extended: Boolean): Unit = {
-    val explain = ExplainCommand(queryExecution.logical, extended = extended)
-    sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
-      // scalastyle:off println
-      r => println(r.getString(0))
-      // scalastyle:on println
-    }
-  }
-  /**
-   * Prints the physical plan to the console for debugging purposes.
-   * @since 1.3.0
-   */
-  override def explain(): Unit = explain(extended = false)
-  /**
-   * Returns all column names and their data types as an array.
-   * @group basic
-   * @since 1.3.0
-   */
-  def dtypes: Array[(String, String)] = { field =>
-    (, field.dataType.toString)
-  }
-  /**
-   * Returns all column names as an array.
-   * @group basic
-   * @since 1.3.0
-   */
-  def columns: Array[String] =
-  /**
-   * Returns true if the `collect` and `take` methods can be run locally
-   * (without any Spark executors).
-   * @group basic
-   * @since 1.3.0
-   */
-  def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
-  /**
-   * Displays the [[DataFrame]] in a tabular form. Strings more than 20 characters will be
-   * truncated, and all cells will be aligned right. For example:
-   * {{{
-   *   year  month AVG('Adj Close) MAX('Adj Close)
-   *   1980  12    0.503218        0.595103
-   *   1981  01    0.523289        0.570307
-   *   1982  02    0.436504        0.475256
-   *   1983  03    0.410516        0.442194
-   *   1984  04    0.450090        0.483521
-   * }}}
-   * @param numRows Number of rows to show
-   *
-   * @group action
-   * @since 1.3.0
-   */
-  def show(numRows: Int): Unit = show(numRows, truncate = true)
-  /**
-   * Displays the top 20 rows of [[DataFrame]] in a tabular form. Strings more than 20 characters
-   * will be truncated, and all cells will be aligned right.
-   * @group action
-   * @since 1.3.0
-   */
-  def show(): Unit = show(20)
-  /**
-   * Displays the top 20 rows of [[DataFrame]] in a tabular form.
-   *
-   * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
-   *              be truncated and all cells will be aligned right
-   *
-   * @group action
-   * @since 1.5.0
-   */
-  def show(truncate: Boolean): Unit = show(20, truncate)
-  /**
-   * Displays the [[DataFrame]] in a tabular form. For example:
-   * {{{
-   *   year  month AVG('Adj Close) MAX('Adj Close)
-   *   1980  12    0.503218        0.595103
-   *   1981  01    0.523289        0.570307
-   *   1982  02    0.436504        0.475256
-   *   1983  03    0.410516        0.442194
-   *   1984  04    0.450090        0.483521
-   * }}}
-   * @param numRows Number of rows to show
-   * @param truncate Whether truncate long strings. If true, strings more than 20 characters will
-   *              be truncated and all cells will be aligned right
-   *
-   * @group action
-   * @since 1.5.0
-   */
-  // scalastyle:off println
-  def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
-  // scalastyle:on println
-  /**
-   * Returns a [[DataFrameNaFunctions]] for working with missing data.
-   * {{{
-   *   // Dropping rows containing any null values.
-   *
-   * }}}
-   *
-   * @group dfops
-   * @since 1.3.1
-   */
-  def na: DataFrameNaFunctions = new DataFrameNaFunctions(toDF())
-  /**
-   * Returns a [[DataFrameStatFunctions]] for working statistic functions support.
-   * {{{
-   *   // Finding frequent items in column with name 'a'.
-   *   df.stat.freqItems(Seq("a"))
-   * }}}
-   *
-   * @group dfops
-   * @since 1.4.0
-   */
-  def stat: DataFrameStatFunctions = new DataFrameStatFunctions(toDF())
-  /**
-   * Cartesian join with another [[DataFrame]].
-   *
-   * Note that cartesian joins are very expensive without an extra filter that can be pushed down.
-   *
-   * @param right Right side of the join operation.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def join(right: DataFrame): DataFrame = withPlan {
-    Join(logicalPlan, right.logicalPlan, joinType = Inner, None)
-  }
-  /**
-   * Inner equi-join with another [[DataFrame]] using the given column.
-   *
-   * Different from other join functions, the join column will only appear once in the output,
-   * i.e. similar to SQL's `JOIN USING` syntax.
-   *
-   * {{{
-   *   // Joining df1 and df2 using the column "user_id"
-   *   df1.join(df2, "user_id")
-   * }}}
-   *
-   * Note that if you perform a self-join using this function without aliasing the input
-   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
-   * there is no way to disambiguate which side of the join you would like to reference.
-   *
-   * @param right Right side of the join operation.
-   * @param usingColumn Name of the column to join on. This column must exist on both sides.
-   * @group dfops
-   * @since 1.4.0
-   */
-  def join(right: DataFrame, usingColumn: String): DataFrame = {
-    join(right, Seq(usingColumn))
-  }
-  /**
-   * Inner equi-join with another [[DataFrame]] using the given columns.
-   *
-   * Different from other join functions, the join columns will only appear once in the output,
-   * i.e. similar to SQL's `JOIN USING` syntax.
-   *
-   * {{{
-   *   // Joining df1 and df2 using the columns "user_id" and "user_name"
-   *   df1.join(df2, Seq("user_id", "user_name"))
-   * }}}
-   *
-   * Note that if you perform a self-join using this function without aliasing the input
-   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
-   * there is no way to disambiguate which side of the join you would like to reference.
-   *
-   * @param right Right side of the join operation.
-   * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
-   * @group dfops
-   * @since 1.4.0
-   */
-  def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
-    join(right, usingColumns, "inner")
-  }
-  /**
-   * Equi-join with another [[DataFrame]] using the given columns.
-   *
-   * Different from other join functions, the join columns will only appear once in the output,
-   * i.e. similar to SQL's `JOIN USING` syntax.
-   *
-   * Note that if you perform a self-join using this function without aliasing the input
-   * [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
-   * there is no way to disambiguate which side of the join you would like to reference.
-   *
-   * @param right Right side of the join operation.
-   * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
-   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
-   * @group dfops
-   * @since 1.6.0
-   */
-  def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
-    // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
-    // by creating a new instance for one of the branch.
-    val joined = sqlContext.executePlan(
-      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
-      .analyzed.asInstanceOf[Join]
-    val condition = { col =>
-      catalyst.expressions.EqualTo(
-        withPlan(joined.left).resolve(col),
-        withPlan(joined.right).resolve(col))
-    }.reduceLeftOption[catalyst.expressions.BinaryExpression] { (cond, eqTo) =>
-      catalyst.expressions.And(cond, eqTo)
-    }
-    // Project only one of the join columns.
-    val joinedCols = JoinType(joinType) match {
-      case Inner | LeftOuter | LeftSemi =>
- => withPlan(joined.left).resolve(col))
-      case RightOuter =>
- => withPlan(joined.right).resolve(col))
-      case FullOuter =>
- { col =>
-          val leftCol = withPlan(joined.left).resolve(col).toAttribute.withNullability(true)
-          val rightCol = withPlan(joined.right).resolve(col).toAttribute.withNullability(true)
-          Alias(Coalesce(Seq(leftCol, rightCol)), col)()
-        }
-      case NaturalJoin(_) => sys.error("NaturalJoin with using clause is not supported.")
-    }
-    // The nullability of output of joined could be different than original column,
-    // so we can only compare them by exprId
-    val joinRefs = AttributeSet(condition.toSeq.flatMap(_.references))
-    val resultCols = joinedCols ++ joined.output.filterNot(joinRefs.contains(_))
-    withPlan {
-      Project(
-        resultCols,
-        Join(
-          joined.left,
-          joined.right,
-          joinType = JoinType(joinType),
-          condition)
-      )
-    }
-  }
-  /**
-   * Inner join with another [[DataFrame]], using the given join expression.
-   *
-   * {{{
-   *   // The following two are equivalent:
-   *   df1.join(df2, $"df1Key" === $"df2Key")
-   *   df1.join(df2).where($"df1Key" === $"df2Key")
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def join(right: DataFrame, joinExprs: Column): DataFrame = join(right, joinExprs, "inner")
-  /**
-   * Join with another [[DataFrame]], using the given join expression. The following performs
-   * a full outer join between `df1` and `df2`.
-   *
-   * {{{
-   *   // Scala:
-   *   import org.apache.spark.sql.functions._
-   *   df1.join(df2, $"df1Key" === $"df2Key", "outer")
-   *
-   *   // Java:
-   *   import static org.apache.spark.sql.functions.*;
-   *   df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
-   * }}}
-   *
-   * @param right Right side of the join.
-   * @param joinExprs Join expression.
-   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = {
-    // Note that in this function, we introduce a hack in the case of self-join to automatically
-    // resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
-    // Consider this case: df.join(df, df("key") === df("key"))
-    // Since df("key") === df("key") is a trivially true condition, this actually becomes a
-    // cartesian join. However, most likely users expect to perform a self join using "key".
-    // With that assumption, this hack turns the trivially true condition into equality on join
-    // keys that are resolved to both sides.
-    // Trigger analysis so in the case of self-join, the analyzer will clone the plan.
-    // After the cloning, left and right side will have distinct expression ids.
-    val plan = withPlan(
-      Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)))
-      .queryExecution.analyzed.asInstanceOf[Join]
-    // If auto self join alias is disabled, return the plan.
-    if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) {
-      return withPlan(plan)
-    }
-    // If left/right have no output set intersection, return the plan.
-    val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed
-    val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed
-    if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) {
-      return withPlan(plan)
-    }
-    // Otherwise, find the trivially true predicates and automatically resolves them to both sides.
-    // By the time we get here, since we have already run analysis, all attributes should've been
-    // resolved and become AttributeReference.
-    val cond = { _.transform {
-      case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference)
-          if a.sameRef(b) =>
-        catalyst.expressions.EqualTo(
-          withPlan(plan.left).resolve(,
-          withPlan(plan.right).resolve(
-    }}
-    withPlan {
-      plan.copy(condition = cond)
-    }
-  }
-  /**
-   * Joins this [[Dataset]] returning a [[Tuple2]] for each pair where `condition` evaluates to
-   * true.
-   *
-   * This is similar to the relation `join` function with one important difference in the
-   * result schema. Since `joinWith` preserves objects present on either side of the join, the
-   * result schema is similarly nested into a tuple under the column names `_1` and `_2`.
-   *
-   * This type of join can be useful both for preserving type-safety with the original object
-   * types as well as working with relational data where either side of the join has column
-   * names in common.
-   *
-   * @param other Right side of the join.
-   * @param condition Join expression.
-   * @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
-   * @since 1.6.0
-   */
-  def joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] = {
-    val left = this.logicalPlan
-    val right = other.logicalPlan
-    val joined = sqlContext.executePlan(Join(left, right, joinType =
-      JoinType(joinType), Some(condition.expr)))
-    val leftOutput = joined.analyzed.output.take(left.output.length)
-    val rightOutput = joined.analyzed.output.takeRight(right.output.length)
-    val leftData = this.unresolvedTEncoder match {
-      case e if e.flat => Alias(leftOutput.head, "_1")()
-      case _ => Alias(CreateStruct(leftOutput), "_1")()
-    }
-    val rightData = other.unresolvedTEncoder match {
-      case e if e.flat => Alias(rightOutput.head, "_2")()
-      case _ => Alias(CreateStruct(rightOutput), "_2")()
-    }
-    implicit val tuple2Encoder: Encoder[(T, U)] =
-      ExpressionEncoder.tuple(this.unresolvedTEncoder, other.unresolvedTEncoder)
-    withTypedPlan[(T, U)](other, encoderFor[(T, U)]) { (left, right) =>
-      Project(
-        leftData :: rightData :: Nil,
-        joined.analyzed)
-    }
-  }
-  /**
-   * Using inner equi-join to join this [[Dataset]] returning a [[Tuple2]] for each pair
-   * where `condition` evaluates to true.
-   *
-   * @param other Right side of the join.
-   * @param condition Join expression.
-   * @since 1.6.0
-   */
-  def joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] = {
-    joinWith(other, condition, "inner")
-  }
-  /**
-   * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
-   *
-   * This is the same operation as "SORT BY" in SQL (Hive QL).
-   *
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = {
-    sortWithinPartitions((sortCol +: sortCols).map(Column(_)) : _*)
-  }
-  /**
-   * Returns a new [[DataFrame]] with each partition sorted by the given expressions.
-   *
-   * This is the same operation as "SORT BY" in SQL (Hive QL).
-   *
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def sortWithinPartitions(sortExprs: Column*): Dataset[T] = {
-    sortInternal(global = false, sortExprs)
-  }
-  /**
-   * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
-   * {{{
-   *   // The following 3 are equivalent
-   *   df.sort("sortcol")
-   *   df.sort($"sortcol")
-   *   df.sort($"sortcol".asc)
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def sort(sortCol: String, sortCols: String*): Dataset[T] = {
-    sort((sortCol +: sortCols).map(apply) : _*)
-  }
-  /**
-   * Returns a new [[DataFrame]] sorted by the given expressions. For example:
-   * {{{
-   *   df.sort($"col1", $"col2".desc)
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def sort(sortExprs: Column*): Dataset[T] = {
-    sortInternal(global = true, sortExprs)
-  }
-  /**
-   * Returns a new [[DataFrame]] sorted by the given expressions.
-   * This is an alias of the `sort` function.
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols : _*)
-  /**
-   * Returns a new [[DataFrame]] sorted by the given expressions.
-   * This is an alias of the `sort` function.
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)
-  /**
-   * Selects column based on the column name and return it as a [[Column]].
-   * Note that the column name can also reference to a nested column like `a.b`.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def apply(colName: String): Column = col(colName)
-  /**
-   * Selects column based on the column name and return it as a [[Column]].
-   * Note that the column name can also reference to a nested column like `a.b`.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def col(colName: String): Column = colName match {
-    case "*" =>
-      Column(ResolvedStar(queryExecution.analyzed.output))
-    case _ =>
-      val expr = resolve(colName)
-      Column(expr)
-  }
-  /**
-   * Returns a new [[DataFrame]] with an alias set.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def as(alias: String): Dataset[T] = withTypedPlan {
-    SubqueryAlias(alias, logicalPlan)
-  }
-  /**
-   * (Scala-specific) Returns a new [[DataFrame]] with an alias set.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def as(alias: Symbol): Dataset[T] = as(
-  /**
-   * Returns a new [[DataFrame]] with an alias set. Same as `as`.
-   * @group dfops
-   * @since 1.6.0
-   */
-  def alias(alias: String): Dataset[T] = as(alias)
-  /**
-   * (Scala-specific) Returns a new [[DataFrame]] with an alias set. Same as `as`.
-   * @group dfops
-   * @since 1.6.0
-   */
-  def alias(alias: Symbol): Dataset[T] = as(alias)
-  /**
-   * Selects a set of column based expressions.
-   * {{{
-   *$"colA", $"colB" + 1)
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def select(cols: Column*): DataFrame = withPlan {
-    Project(, logicalPlan)
-  }
-  /**
-   * Selects a set of columns. This is a variant of `select` that can only select
-   * existing columns using column names (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // The following two are equivalent:
-   *"colA", "colB")
-   *$"colA", $"colB")
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
-  /**
-   * Selects a set of SQL expressions. This is a variant of `select` that accepts
-   * SQL expressions.
-   *
-   * {{{
-   *   // The following are equivalent:
-   *   df.selectExpr("colA", "colB as newName", "abs(colC)")
-   *"colA"), expr("colB as newName"), expr("abs(colC)"))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def selectExpr(exprs: String*): DataFrame = {
-    select( { expr =>
-      Column(sqlContext.sqlParser.parseExpression(expr))
-    }: _*)
-  }
-  /**
-   * Returns a new [[Dataset]] by computing the given [[Column]] expression for each element.
-   *
-   * {{{
-   *   val ds = Seq(1, 2, 3).toDS()
-   *   val newDS ="value + 1").as[Int])
-   * }}}
-   * @since 1.6.0
-   */
-  def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = {
-    new Dataset[U1](
-      sqlContext,
-      Project(
-        c1.withInputType(
-          boundTEncoder,
-          logicalPlan.output).named :: Nil,
-        logicalPlan),
-      implicitly[Encoder[U1]])
-  }
-  /**
-   * Internal helper function for building typed selects that return tuples.  For simplicity and
-   * code reuse, we do this without the help of the type system and then use helper functions
-   * that cast appropriately for the user facing interface.
-   */
-  protected def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
-    val encoders =
-    val namedColumns =
-, logicalPlan.output).named)
-    val execution = new QueryExecution(sqlContext, Project(namedColumns, logicalPlan))
-    new Dataset(sqlContext, execution, ExpressionEncoder.tuple(encoders))
-  }
-  /**
-   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
-   * @since 1.6.0
-   */
-  def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] =
-    selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
-  /**
-   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
-   * @since 1.6.0
-   */
-  def select[U1, U2, U3](
-      c1: TypedColumn[T, U1],
-      c2: TypedColumn[T, U2],
-      c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] =
-    selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]]
-  /**
-   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
-   * @since 1.6.0
-   */
-  def select[U1, U2, U3, U4](
-      c1: TypedColumn[T, U1],
-      c2: TypedColumn[T, U2],
-      c3: TypedColumn[T, U3],
-      c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] =
-    selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]]
-  /**
-   * Returns a new [[Dataset]] by computing the given [[Column]] expressions for each element.
-   * @since 1.6.0
-   */
-  def select[U1, U2, U3, U4, U5](
-      c1: TypedColumn[T, U1],
-      c2: TypedColumn[T, U2],
-      c3: TypedColumn[T, U3],
-      c4: TypedColumn[T, U4],
-      c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
-    selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, U5)]]
-  /**
-   * Filters rows using the given condition.
-   * {{{
-   *   // The following are equivalent:
-   *   peopleDf.filter($"age" > 15)
-   *   peopleDf.where($"age" > 15)
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def filter(condition: Column): Dataset[T] = withTypedPlan {
-    Filter(condition.expr, logicalPlan)
-  }
-  /**
-   * Filters rows using the given SQL expression.
-   * {{{
-   *   peopleDf.filter("age > 15")
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def filter(conditionExpr: String): Dataset[T] = {
-    filter(Column(sqlContext.sqlParser.parseExpression(conditionExpr)))
-  }
-  /**
-   * Filters rows using the given condition. This is an alias for `filter`.
-   * {{{
-   *   // The following are equivalent:
-   *   peopleDf.filter($"age" > 15)
-   *   peopleDf.where($"age" > 15)
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def where(condition: Column): Dataset[T] = filter(condition)
-  /**
-   * Filters rows using the given SQL expression.
-   * {{{
-   *   peopleDf.where("age > 15")
-   * }}}
-   * @group dfops
-   * @since 1.5.0
-   */
-  def where(conditionExpr: String): Dataset[T] = {
-    filter(Column(sqlContext.sqlParser.parseExpression(conditionExpr)))
-  }
-  /**
-   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns grouped by department.
-   *   df.groupBy($"department").avg()
-   *
-   *   // Compute the max age and average salary, grouped by department and gender.
-   *   df.groupBy($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def groupBy(cols: Column*): GroupedData = {
-    GroupedData(toDF(),, GroupedData.GroupByType)
-  }
-  /**
-   * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolluped by department and group.
-   *   df.rollup($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, rolluped by department and gender.
-   *   df.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def rollup(cols: Column*): GroupedData = {
-    GroupedData(toDF(),, GroupedData.RollupType)
-  }
-  /**
-   * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and group.
-   *   df.cube($"department", $"group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and gender.
-   *   df.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def cube(cols: Column*): GroupedData = GroupedData(toDF(),, GroupedData.CubeType)
-  /**
-   * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * This is a variant of groupBy that can only group by existing columns using column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns grouped by department.
-   *   df.groupBy("department").avg()
-   *
-   *   // Compute the max age and average salary, grouped by department and gender.
-   *   df.groupBy($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def groupBy(col1: String, cols: String*): GroupedData = {
-    val colNames: Seq[String] = col1 +: cols
-    GroupedData(toDF(), => resolve(colName)), GroupedData.GroupByType)
-  }
-  /**
-   * (Scala-specific)
-   * Reduces the elements of this [[Dataset]] using the specified binary function. The given `func`
-   * must be commutative and associative or the result may be non-deterministic.
-   * @since 1.6.0
-   */
-  def reduce(func: (T, T) => T): T = rdd.reduce(func)
-  /**
-   * (Java-specific)
-   * Reduces the elements of this Dataset using the specified binary function.  The given `func`
-   * must be commutative and associative or the result may be non-deterministic.
-   * @since 1.6.0
-   */
-  def reduce(func: ReduceFunction[T]): T = reduce(, _))
-  /**
-   * (Scala-specific)
-   * Returns a [[GroupedDataset]] where the data is grouped by the given key `func`.
-   * @since 1.6.0
-   */
-  def groupByKey[K: Encoder](func: T => K): GroupedDataset[K, T] = {
-    val inputPlan = logicalPlan
-    val withGroupingKey = AppendColumns(func, inputPlan)
-    val executed = sqlContext.executePlan(withGroupingKey)
-    new GroupedDataset(
-      encoderFor[K],
-      encoderFor[T],
-      executed,
-      inputPlan.output,
-      withGroupingKey.newColumns)
-  }
-  /**
-   * Returns a [[GroupedDataset]] where the data is grouped by the given [[Column]] expressions.
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def groupByKey(cols: Column*): GroupedDataset[Row, T] = {
-    val withKeyColumns = logicalPlan.output ++
-    val withKey = Project(withKeyColumns, logicalPlan)
-    val executed = sqlContext.executePlan(withKey)
-    val dataAttributes = executed.analyzed.output.dropRight(cols.size)
-    val keyAttributes = executed.analyzed.output.takeRight(cols.size)
-    new GroupedDataset(
-      RowEncoder(keyAttributes.toStructType),
-      encoderFor[T],
-      executed,
-      dataAttributes,
-      keyAttributes)
-  }
-  /**
-   * (Java-specific)
-   * Returns a [[GroupedDataset]] where the data is grouped by the given key `func`.
-   * @since 1.6.0
-   */
-  def groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): GroupedDataset[K, T] =
-    groupByKey(
-  /**
-   * Create a multi-dimensional rollup for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * This is a variant of rollup that can only group by existing columns using column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns rolluped by department and group.
-   *   df.rollup("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, rolluped by department and gender.
-   *   df.rollup($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def rollup(col1: String, cols: String*): GroupedData = {
-    val colNames: Seq[String] = col1 +: cols
-    GroupedData(toDF(), => resolve(colName)), GroupedData.RollupType)
-  }
-  /**
-   * Create a multi-dimensional cube for the current [[DataFrame]] using the specified columns,
-   * so we can run aggregation on them.
-   * See [[GroupedData]] for all the available aggregate functions.
-   *
-   * This is a variant of cube that can only group by existing columns using column names
-   * (i.e. cannot construct expressions).
-   *
-   * {{{
-   *   // Compute the average for all numeric columns cubed by department and group.
-   *   df.cube("department", "group").avg()
-   *
-   *   // Compute the max age and average salary, cubed by department and gender.
-   *   df.cube($"department", $"gender").agg(Map(
-   *     "salary" -> "avg",
-   *     "age" -> "max"
-   *   ))
-   * }}}
-   * @group dfops
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def cube(col1: String, cols: String*): GroupedData = {
-    val colNames: Seq[String] = col1 +: cols
-    GroupedData(toDF(), => resolve(colName)), GroupedData.CubeType)
-  }
-  /**
-   * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups.
-   * {{{
-   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
-   *   df.agg("age" -> "max", "salary" -> "avg")
-   *   df.groupBy().agg("age" -> "max", "salary" -> "avg")
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
-    groupBy().agg(aggExpr, aggExprs : _*)
-  }
-  /**
-   * (Scala-specific) Aggregates on the entire [[DataFrame]] without groups.
-   * {{{
-   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
-   *   df.agg(Map("age" -> "max", "salary" -> "avg"))
-   *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
-  /**
-   * (Java-specific) Aggregates on the entire [[DataFrame]] without groups.
-   * {{{
-   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
-   *   df.agg(Map("age" -> "max", "salary" -> "avg"))
-   *   df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)
-  /**
-   * Aggregates on the entire [[DataFrame]] without groups.
-   * {{{
-   *   // df.agg(...) is a shorthand for df.groupBy().agg(...)
-   *   df.agg(max($"age"), avg($"salary"))
-   *   df.groupBy().agg(max($"age"), avg($"salary"))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  @scala.annotation.varargs
-  def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)
-  /**
-   * Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
-   * and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
-   * @group dfops
-   * @since 1.3.0
-   */
-  def limit(n: Int): Dataset[T] = withTypedPlan {
-    Limit(Literal(n), logicalPlan)
-  }
-  /**
-   * Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
-   * This is equivalent to `UNION ALL` in SQL.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def unionAll(other: Dataset[T]): Dataset[T] = withTypedPlan {
-    // This breaks caching, but it's usually ok because it addresses a very specific use case:
-    // using union to union many files or partitions.
-    CombineUnions(Union(logicalPlan, other.logicalPlan))
-  }
-  def union(other: Dataset[T]): Dataset[T] = unionAll(other)
-  /**
-   * Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
-   * This is equivalent to `INTERSECT` in SQL.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def intersect(other: Dataset[T]): Dataset[T] = withTypedPlan {
-    Intersect(logicalPlan, other.logicalPlan)
-  }
-  /**
-   * Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
-   * This is equivalent to `EXCEPT` in SQL.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def except(other: Dataset[T]): Dataset[T] = withTypedPlan {
-    Except(logicalPlan, other.logicalPlan)
-  }
-  def subtract(other: Dataset[T]): Dataset[T] = except(other)
-  /**
-   * Returns a new [[DataFrame]] by sampling a fraction of rows.
-   *
-   * @param withReplacement Sample with replacement or not.
-   * @param fraction Fraction of rows to generate.
-   * @param seed Seed for sampling.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = withTypedPlan {
-    Sample(0.0, fraction, withReplacement, seed, logicalPlan)()
-  }
-  /**
-   * Returns a new [[DataFrame]] by sampling a fraction of rows, using a random seed.
-   *
-   * @param withReplacement Sample with replacement or not.
-   * @param fraction Fraction of rows to generate.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
-    sample(withReplacement, fraction, Utils.random.nextLong)
-  }
-  /**
-   * Randomly splits this [[DataFrame]] with the provided weights.
-   *
-   * @param weights weights for splits, will be normalized if they don't sum to 1.
-   * @param seed Seed for sampling.
-   * @group dfops
-   * @since 1.4.0
-   */
-  def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = {
-    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
-    // constituent partitions each time a split is materialized which could result in
-    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
-    // ordering deterministic.
-    val sorted = Sort(, Ascending)), global = false, logicalPlan)
-    val sum = weights.sum
-    val normalizedCumWeights = / sum).scanLeft(0.0d)(_ + _)
-    normalizedCumWeights.sliding(2).map { x =>
-      new Dataset[T](
-        sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)(), encoder)
-    }.toArray
-  }
-  /**
-   * Randomly splits this [[DataFrame]] with the provided weights.
-   *
-   * @param weights weights for splits, will be normalized if they don't sum to 1.
-   * @group dfops
-   * @since 1.4.0
-   */
-  def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
-    randomSplit(weights, Utils.random.nextLong)
-  }
-  /**
-   * Randomly splits this [[DataFrame]] with the provided weights. Provided for the Python Api.
-   *
-   * @param weights weights for splits, will be normalized if they don't sum to 1.
-   * @param seed Seed for sampling.
-   * @group dfops
-   */
-  private[spark] def randomSplit(weights: List[Double], seed: Long): Array[Dataset[T]] = {
-    randomSplit(weights.toArray, seed)
-  }
-  /**
-   * (Scala-specific) Returns a new [[DataFrame]] where each row has been expanded to zero or more
-   * rows by the provided function.  This is similar to a `LATERAL VIEW` in HiveQL. The columns of
-   * the input row are implicitly joined with each row that is output by the function.
-   *
-   * The following example uses this function to count the number of books which contain
-   * a given word:
-   *
-   * {{{
-   *   case class Book(title: String, words: String)
-   *   val df: RDD[Book]
-   *
-   *   case class Word(word: String)
-   *   val allWords = df.explode('words) {
-   *     case Row(words: String) => words.split(" ").map(Word(_))
-   *   }
-   *
-   *   val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
-    val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
-    val elementTypes = {
-      attr => (attr.dataType, attr.nullable, }
-    val names =
-    val convert = CatalystTypeConverters.createToCatalystConverter(schema)
-    val rowFunction =
-      f.andThen([InternalRow]))
-    val generator = UserDefinedGenerator(elementTypes, rowFunction,
-    withPlan {
-      Generate(generator, join = true, outer = false,
-        qualifier = None, generatorOutput = Nil, logicalPlan)
-    }
-  }
-  /**
-   * (Scala-specific) Returns a new [[DataFrame]] where a single column has been expanded to zero
-   * or more rows by the provided function.  This is similar to a `LATERAL VIEW` in HiveQL. All
-   * columns of the input row are implicitly joined with each value that is output by the function.
-   *
-   * {{{
-   *   df.explode("words", "word") {words: String => words.split(" ")}
-   * }}}
-   * @group dfops
-   * @since 1.3.0
-   */
-  def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])
-    : DataFrame = {
-    val dataType = ScalaReflection.schemaFor[B].dataType
-    val attributes = AttributeReference(outputColumn, dataType)() :: Nil
-    // TODO handle the metadata?
-    val elementTypes = { attr => (attr.dataType, attr.nullable, }
-    def rowFunction(row: Row): TraversableOnce[InternalRow] = {
-      val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
-      f(row(0).asInstanceOf[A]).map(o => InternalRow(convert(o)))
-    }
-    val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil)
-    withPlan {
-      Generate(generator, join = true, outer = false,
-        qualifier = None, generatorOutput = Nil, logicalPlan)
-    }
-  }
-  /////////////////////////////////////////////////////////////////////////////
-  /**
-   * Returns a new [[DataFrame]] by adding a column or replacing the existing column that has
-   * the same name.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def withColumn(colName: String, col: Column): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
-    val output = queryExecution.analyzed.output
-    val shouldReplace = output.exists(f => resolver(, colName))
-    if (shouldReplace) {
-      val columns = { field =>
-        if (resolver(, colName)) {
-        } else {
-          Column(field)
-        }
-      }
-      select(columns : _*)
-    } else {
-      select(Column("*"),
-    }
-  }
-  /**
-   * Returns a new [[DataFrame]] by adding a column with metadata.
-   */
-  private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
-    val output = queryExecution.analyzed.output
-    val shouldReplace = output.exists(f => resolver(, colName))
-    if (shouldReplace) {
-      val columns = { field =>
-        if (resolver(, colName)) {
-, metadata)
-        } else {
-          Column(field)
-        }
-      }
-      select(columns : _*)
-    } else {
-      select(Column("*"),, metadata))
-    }
-  }
-  /**
-   * Returns a new [[DataFrame]] with a column renamed.
-   * This is a no-op if schema doesn't contain existingName.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def withColumnRenamed(existingName: String, newName: String): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
-    val output = queryExecution.analyzed.output
-    val shouldRename = output.exists(f => resolver(, existingName))
-    if (shouldRename) {
-      val columns = { col =>
-        if (resolver(, existingName)) {
-          Column(col).as(newName)
-        } else {
-          Column(col)
-        }
-      }
-      select(columns : _*)
-    } else {
-      toDF()
-    }
-  }
-  /**
-   * Returns a new [[DataFrame]] with a column dropped.
-   * This is a no-op if schema doesn't contain column name.
-   * @group dfops
-   * @since 1.4.0
-   */
-  def drop(colName: String): DataFrame = {
-    drop(Seq(colName) : _*)
-  }
-  /**
-   * Returns a new [[DataFrame]] with columns dropped.
-   * This is a no-op if schema doesn't contain column name(s).
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def drop(colNames: String*): DataFrame = {
-    val resolver = sqlContext.analyzer.resolver
-    val remainingCols =
-      schema.filter(f => colNames.forall(n => !resolver(, n))).map(f => Column(
-    if (remainingCols.size == this.schema.size) {
-      toDF()
-    } else {
- _*)
-    }
-  }
-  /**
-   * Returns a new [[DataFrame]] with a column dropped.
-   * This version of drop accepts a Column rather than a name.
-   * This is a no-op if the DataFrame doesn't have a column
-   * with an equivalent expression.
-   * @group dfops
-   * @since 1.4.1
-   */
-  def drop(col: Column): DataFrame = {
-    val expression = col match {
-      case Column(u: UnresolvedAttribute) =>
-        queryExecution.analyzed.resolveQuoted(, sqlContext.analyzer.resolver).getOrElse(u)
-      case Column(expr: Expression) => expr
-    }
-    val attrs = this.logicalPlan.output
-    val colsAfterDrop = attrs.filter { attr =>
-      attr != expression
-    }.map(attr => Column(attr))
-    select(colsAfterDrop : _*)
-  }
-  /**
-   * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
-   * This is an alias for `distinct`.
-   * @group dfops
-   * @since 1.4.0
-   */
-  def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
-  /**
-   * (Scala-specific) Returns a new [[DataFrame]] with duplicate rows removed, considering only
-   * the subset of columns.
-   *
-   * @group dfops
-   * @since 1.4.0
-   */
-  def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
-    val groupCols =
-    val groupColExprIds =
-    val aggCols = { attr =>
-      if (groupColExprIds.contains(attr.exprId)) {
-        attr
-      } else {
-        Alias(new First(attr).toAggregateExpression(),
-      }
-    }
-    Aggregate(groupCols, aggCols, logicalPlan)
-  }
-  /**
-   * Returns a new [[DataFrame]] with duplicate rows removed, considering only
-   * the subset of columns.
-   *
-   * @group dfops
-   * @since 1.4.0
-   */
-  def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
-  /**
-   * Computes statistics for numeric columns, including count, mean, stddev, min, and max.
-   * If no columns are given, this function computes statistics for all numerical columns.
-   *
-   * This function is meant for exploratory data analysis, as we make no guarantee about the
-   * backward compatibility of the schema of the resulting [[DataFrame]]. If you want to
-   * programmatically compute summary statistics, use the `agg` function instead.
-   *
-   * {{{
-   *   df.describe("age", "height").show()
-   *
-   *   // output:
-   *   // summary age   height
-   *   // count   10.0  10.0
-   *   // mean    53.3  178.05
-   *   // stddev  11.6  15.7
-   *   // min     18.0  163.0
-   *   // max     92.0  192.0
-   * }}}
-   *
-   * @group action
-   * @since 1.3.1
-   */
-  @scala.annotation.varargs
-  def describe(cols: String*): DataFrame = withPlan {
-    // The list of summary statistics to compute, in the form of expressions.
-    val statistics = List[(String, Expression => Expression)](
-      "count" -> ((child: Expression) => Count(child).toAggregateExpression()),
-      "mean" -> ((child: Expression) => Average(child).toAggregateExpression()),
-      "stddev" -> ((child: Expression) => StddevSamp(child).toAggregateExpression()),
-      "min" -> ((child: Expression) => Min(child).toAggregateExpression()),
-      "max" -> ((child: Expression) => Max(child).toAggregateExpression()))
-    val outputCols =
-      (if (cols.isEmpty) else cols).toList
-    val ret: Seq[Row] = if (outputCols.nonEmpty) {
-      val aggExprs = statistics.flatMap { case (_, colToAgg) =>
- => Column(Cast(colToAgg(Column(c).expr), StringType)).as(c))
-      }
-      val row = agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
-      // Pivot the data so each summary is one row
-      row.grouped(outputCols.size) { case (aggregation, (statistic, _)) =>
-        Row(statistic :: aggregation.toList: _*)
-      }
-    } else {
-      // If there are no output columns, just output a single column that contains the stats.
- { case (name, _) => Row(name) }
-    }
-    // All columns are string type
-    val schema = StructType(
-      StructField("summary", StringType) ::, StringType))).toAttributes
-    LocalRelation.fromExternalRows(schema, ret)
-  }
-  /**
-   * Returns the first `n` rows.
-   *
-   * @note this method should only be used if the resulting array is expected to be small, as
-   * all the data is loaded into the driver's memory.
-   *
-   * @group action
-   * @since 1.3.0
-   */
-  def head(n: Int): Array[T] = withTypedCallback("head", limit(n)) { df =>
-    df.collect(needCallback = false)
-  }
-  /**
-   * Returns the first row.
-   * @group action
-   * @since 1.3.0
-   */
-  def head(): T = head(1).head
-  /**
-   * Returns the first row. Alias for head().
-   * @group action
-   * @since 1.3.0
-   */
-  def first(): T = head()
-  /**
-   * Concise syntax for chaining custom transformations.
-   * {{{
-   *   def featurize(ds: DataFrame) = ...
-   *
-   *   df
-   *     .transform(featurize)
-   *     .transform(...)
-   * }}}
-   * @since 1.6.0
-   */
-  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
-  /**
-   * (Scala-specific)
-   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
-   * @since 1.6.0
-   */
-  def filter(func: T => Boolean): Dataset[T] = mapPartitions(_.filter(func))
-  /**
-   * (Java-specific)
-   * Returns a new [[Dataset]] that only contains elements where `func` returns `true`.
-   * @since 1.6.0
-   */
-  def filter(func: FilterFunction[T]): Dataset[T] = filter(t =>
-  /**
-   * (Scala-specific)
-   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
-   * @since 1.6.0
-   */
-  def map[U : Encoder](func: T => U): Dataset[U] = mapPartitions(
-  /**
-   * (Java-specific)
-   * Returns a new [[Dataset]] that contains the result of applying `func` to each element.
-   * @since 1.6.0
-   */
-  def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] =
-    map(t =>
-  /**
-   * (Scala-specific)
-   * Returns a new [[Dataset]] that contains the result of applying `func` to each partition.
-   * @since 1.6.0
-   */
-  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
-    new Dataset[U](
-      sqlContext,
-      MapPartitions[T, U](func, logicalPlan),
-      implicitly[Encoder[U]])
-  }
-  /**
-   * (Java-specific)
-   * Returns a new [[Dataset]] that contains the result of applying `func` to each partition.
-   * @since 1.6.0
-   */
-  def mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
-    val func: (Iterator[T]) => Iterator[U] = x =>
-    mapPartitions(func)(encoder)
-  }
-  /**
-   * (Scala-specific)
-   * Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]],
-   * and then flattening the results.
-   * @since 1.6.0
-   */
-  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
-    mapPartitions(_.flatMap(func))
-  /**
-   * (Java-specific)
-   * Returns a new [[Dataset]] by first applying a function to all elements of this [[Dataset]],
-   * and then flattening the results.
-   * @since 1.6.0
-   */
-  def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
-    val func: (T) => Iterator[U] = x =>
-    flatMap(func)(encoder)
-  }
-  /**
-   * Applies a function `f` to all rows.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def foreach(f: T => Unit): Unit = withNewExecutionId {
-    rdd.foreach(f)
-  }
-  /**
-   * (Java-specific)
-   * Runs `func` on each element of this [[Dataset]].
-   * @since 1.6.0
-   */
-  def foreach(func: ForeachFunction[T]): Unit = foreach(
-  /**
-   * Applies a function f to each partition of this [[DataFrame]].
-   * @group rdd
-   * @since 1.3.0
-   */
-  def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId {
-    rdd.foreachPartition(f)
-  }
-  /**
-   * (Java-specific)
-   * Runs `func` on each partition of this [[Dataset]].
-   * @since 1.6.0
-   */
-  def foreachPartition(func: ForeachPartitionFunction[T]): Unit =
-    foreachPartition(it =>
-  /**
-   * Returns the first `n` rows in the [[DataFrame]].
-   *
-   * Running take requires moving data into the application's driver process, and doing so with
-   * a very large `n` can crash the driver process with OutOfMemoryError.
-   *
-   * @group action
-   * @since 1.3.0
-   */
-  def take(n: Int): Array[T] = head(n)
-  /**
-   * Returns the first `n` rows in the [[DataFrame]] as a list.
-   *
-   * Running take requires moving data into the application's driver process, and doing so with
-   * a very large `n` can crash the driver process with OutOfMemoryError.
-   *
-   * @group action
-   * @since 1.6.0
-   */
-  def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n) : _*)
-  /**
-   * Returns an array that contains all of [[Row]]s in this [[DataFrame]].
-   *
-   * Running collect requires moving all the data into the application's driver process, and
-   * doing so on a very large dataset can crash the driver process with OutOfMemoryError.
-   *
-   * For Java API, use [[collectAsList]].
-   *
-   * @group action
-   * @since 1.3.0
-   */
-  def collect(): Array[T] = collect(needCallback = true)
-  /**
-   * Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
-   *
-   * Running collect requires moving all the data into the application's driver process, and
-   * doing so on a very large dataset can crash the driver process with OutOfMemoryError.
-   *
-   * @group action
-   * @since 1.3.0
-   */
-  def collectAsList(): java.util.List[T] = withCallback("collectAsList", toDF()) { _ =>
-    withNewExecutionId {
-      val values =
-      java.util.Arrays.asList(values : _*)
-    }
-  }
-  private def collect(needCallback: Boolean): Array[T] = {
-    def execute(): Array[T] = withNewExecutionId {
-    }
-    if (needCallback) {
-      withCallback("collect", toDF())(_ => execute())
-    } else {
-      execute()
-    }
-  }
-  /**
-   * Returns the number of rows in the [[DataFrame]].
-   * @group action
-   * @since 1.3.0
-   */
-  def count(): Long = withCallback("count", groupBy().count()) { df =>
-    df.collect(needCallback = false).head.getLong(0)
-  }
-  /**
-   * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = true, logicalPlan)
-  }
-  /**
-   * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into
-   * `numPartitions`. The resulting DataFrame is hash partitioned.
-   *
-   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
-   *
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
-    RepartitionByExpression(, logicalPlan, Some(numPartitions))
-  }
-  /**
-   * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving
-   * the existing number of partitions. The resulting DataFrame is hash partitioned.
-   *
-   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
-   *
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
-    RepartitionByExpression(, logicalPlan, numPartitions = None)
-  }
-  /**
-   * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
-   * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
-   * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
-   * the 100 new partitions will claim 10 of the current partitions.
-   * @group rdd
-   * @since 1.4.0
-   */
-  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
-    Repartition(numPartitions, shuffle = false, logicalPlan)
-  }
-  /**
-   * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
-   * This is an alias for `dropDuplicates`.
-   * @group dfops
-   * @since 1.3.0
-   */
-  def distinct(): Dataset[T] = dropDuplicates()
-  /**
-   * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`).
-   * @group basic
-   * @since 1.3.0
-   */
-  def persist(): this.type = {
-    sqlContext.cacheManager.cacheQuery(this)
-    this
-  }
-  /**
-   * Persist this [[DataFrame]] with the default storage level (`MEMORY_AND_DISK`).
-   * @group basic
-   * @since 1.3.0
-   */
-  def cache(): this.type = persist()
-  /**
-   * Persist this [[DataFrame]] with the given storage level.
-   * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
-   *                 `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
-   *                 `MEMORY_AND_DISK_2`, etc.
-   * @group basic
-   * @since 1.3.0
-   */
-  def persist(newLevel: StorageLevel): this.type = {
-    sqlContext.cacheManager.cacheQuery(this, None, newLevel)
-    this
-  }
-  /**
-   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk.
-   * @param blocking Whether to block until all blocks are deleted.
-   * @group basic
-   * @since 1.3.0
-   */
-  def unpersist(blocking: Boolean): this.type = {
-    sqlContext.cacheManager.tryUncacheQuery(this, blocking)
-    this
-  }
-  /**
-   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it from memory and disk.
-   * @group basic
-   * @since 1.3.0
-   */
-  def unpersist(): this.type = unpersist(blocking = false)
-  /////////////////////////////////////////////////////////////////////////////
-  // I/O
-  /////////////////////////////////////////////////////////////////////////////
-  /**
-   * Represents the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s. Note that the RDD is
-   * memoized. Once called, it won't change even if you change any query planning related Spark SQL
-   * configurations (e.g. `spark.sql.shuffle.partitions`).
-   * @group rdd
-   * @since 1.3.0
-   */
-  lazy val rdd: RDD[T] = {
-    // use a local variable to make sure the map closure doesn't capture the whole DataFrame
-    val schema = this.schema
-    queryExecution.toRdd.mapPartitions { rows =>
-    }
-  }
-  /**
-   * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def toJavaRDD: JavaRDD[T] = rdd.toJavaRDD()
-  /**
-   * Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def javaRDD: JavaRDD[T] = toJavaRDD
-  /**
-   * Registers this [[DataFrame]] as a temporary table using the given name.  The lifetime of this
-   * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.
-   *
-   * @group basic
-   * @since 1.3.0
-   */
-  def registerTempTable(tableName: String): Unit = {
-    sqlContext.registerDataFrameAsTable(toDF(), tableName)
-  }
-  /**
-   * :: Experimental ::
-   * Interface for saving the content of the [[DataFrame]] out into external storage or streams.
-   *
-   * @group output
-   * @since 1.4.0
-   */
-  @Experimental
-  def write: DataFrameWriter = new DataFrameWriter(toDF())
-  /**
-   * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def toJSON: RDD[String] = {
-    val rowSchema = this.schema
-    queryExecution.toRdd.mapPartitions { iter =>
-      val writer = new CharArrayWriter()
-      // create the Generator without separator inserted between 2 records
-      val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
-      new Iterator[String] {
-        override def hasNext: Boolean = iter.hasNext
-        override def next(): String = {
-          JacksonGenerator(rowSchema, gen)(
-          gen.flush()
-          val json = writer.toString
-          if (hasNext) {
-            writer.reset()
-          } else {
-            gen.close()
-          }
-          json
-        }
-      }
-    }
-  }
-  /**
-   * Returns a best-effort snapshot of the files that compose this DataFrame. This method simply
-   * asks each constituent BaseRelation for its respective files and takes the union of all results.
-   * Depending on the source relations, this may not find all input files. Duplicates are removed.
-   */
-  def inputFiles: Array[String] = {
-    val files: Seq[String] = logicalPlan.collect {
-      case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
-        fsBasedRelation.inputFiles
-      case fr: FileRelation =>
-        fr.inputFiles
-    }.flatten
-    files.toSet.toArray
-  }
-  ////////////////////////////////////////////////////////////////////////////
-  // for Python API
-  ////////////////////////////////////////////////////////////////////////////
-  /**
-   * Converts a JavaRDD to a PythonRDD.
-   */
-  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
-    val structType = schema  // capture it for closure
-    val rdd =, structType))
-    EvaluatePython.javaToPython(rdd)
-  }
-  protected[sql] def collectToPython(): Int = {
-    withNewExecutionId {
-      PythonRDD.collectAndServe(javaToPython.rdd)
-    }
-  }
-  /**
-   * Wrap a DataFrame action to track all Spark jobs in the body so that we can connect them with
-   * an execution.
-   */
-  private[sql] def withNewExecutionId[U](body: => U): U = {
-    SQLExecution.withNewExecutionId(sqlContext, queryExecution)(body)
-  }
-  /**
-   * Wrap a DataFrame action to track the QueryExecution and time cost, then report to the
-   * user-registered callback functions.
-   */
-  private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
-    try {
-      df.queryExecution.executedPlan.foreach { plan =>
-        plan.resetMetrics()
-      }
-      val start = System.nanoTime()
-      val result = action(df)
-      val end = System.nanoTime()
-      sqlContext.listenerManager.onSuccess(name, df.queryExecution, end - start)
-      result
-    } catch {
-      case e: Exception =>
-        sqlContext.listenerManager.onFailure(name, df.queryExecution, e)
-        throw e
-    }
-  }
-  private def withTypedCallback[A, B](name: String, ds: Dataset[A])(action: Dataset[A] => B) = {
-    try {
-      ds.queryExecution.executedPlan.foreach { plan =>
-        plan.resetMetrics()
-      }
-      val start = System.nanoTime()
-      val result = action(ds)
-      val end = System.nanoTime()
-      sqlContext.listenerManager.onSuccess(name, ds.queryExecution, end - start)
-      result
-    } catch {
-      case e: Exception =>
-        sqlContext.listenerManager.onFailure(name, ds.queryExecution, e)
-        throw e
-    }
-  }
-  private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
-    val sortOrder: Seq[SortOrder] = { col =>
-      col.expr match {
-        case expr: SortOrder =>
-          expr
-        case expr: Expression =>
-          SortOrder(expr, Ascending)
-      }
-    }
-    withTypedPlan {
-      Sort(sortOrder, global = global, logicalPlan)
-    }
-  }
-  /** A convenient function to wrap a logical plan and produce a DataFrame. */
-  @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
-    Dataset.newDataFrame(sqlContext, logicalPlan)
-  }
-  /** A convenient function to wrap a logical plan and produce a DataFrame. */
-  @inline private def withTypedPlan(logicalPlan: => LogicalPlan): Dataset[T] = {
-    new Dataset[T](sqlContext, logicalPlan, encoder)
-  }
-  private[sql] def withTypedPlan[R](
-      other: Dataset[_], encoder: Encoder[R])(
-      f: (LogicalPlan, LogicalPlan) => LogicalPlan): Dataset[R] =
-    new Dataset[R](sqlContext, f(logicalPlan, other.logicalPlan), encoder)

