You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/03/18 08:57:26 UTC

spark git commit: [SPARK-13826][SQL] Addendum: update documentation for Datasets

Repository: spark
Updated Branches:
  refs/heads/master 750ed64cd -> bb1fda01f


[SPARK-13826][SQL] Addendum: update documentation for Datasets

## What changes were proposed in this pull request?
This patch updates documentations for Datasets. I also updated some internal documentation for exchange/broadcast.

## How was this patch tested?
Just documentation/api stability update.

Author: Reynold Xin <rx...@databricks.com>

Closes #11814 from rxin/dataset-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb1fda01
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb1fda01
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb1fda01

Branch: refs/heads/master
Commit: bb1fda01fe620422c442b305f5ceeb552871a490
Parents: 750ed64
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Mar 18 00:57:23 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Mar 18 00:57:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  | 11 ++-
 .../scala/org/apache/spark/sql/Dataset.scala    | 81 +++++++++++++-------
 .../execution/exchange/BroadcastExchange.scala  |  3 +-
 .../spark/sql/execution/exchange/Exchange.scala |  6 +-
 4 files changed, 70 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9951f0f..7ed1c51 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -138,7 +138,16 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
   /**
    * Partitions the output by the given columns on the file system. If specified, the output is
-   * laid out on the file system similar to Hive's partitioning scheme.
+   * laid out on the file system similar to Hive's partitioning scheme. As an example, when we
+   * partition a dataset by year and then month, the directory layout would look like:
+   *
+   *   - year=2016/month=01/
+   *   - year=2016/month=02/
+   *
+   * Partitioning is one of the most widely used techniques to optimize physical data layout.
+   * It provides a coarse-grained index for skipping unnecessary data reads when queries have
+   * predicates on the partitioned columns. In order for partitioning to work well, the number
+   * of distinct values in each column should typically be less than tens of thousands.
    *
    * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 209bac3..39f7f35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -61,36 +61,48 @@ private[sql] object Dataset {
 }
 
 /**
- * A [[Dataset]] is a strongly typed collection of objects that can be transformed in parallel
- * using functional or relational operations.
+ * A [[Dataset]] is a strongly typed collection of domain-specific objects that can be transformed
+ * in parallel using functional or relational operations. Each Dataset also has an untyped view
+ * called a [[DataFrame]], which is a Dataset of [[Row]].
  *
- * A [[Dataset]] differs from an [[RDD]] in the following ways:
+ * Operations available on Datasets are divided into transformations and actions. Transformations
+ * are the ones that produce new Datasets, and actions are the ones that trigger computation and
+ * return results. Example transformations include map, filter, select, aggregate (groupBy).
+ * Example actions count, show, or writing data out to file systems.
  *
- *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan and the data is stored
- *    in the encoded form.  This representation allows for additional logical operations and
- *    enables many operations (sorting, shuffling, etc.) to be performed without deserializing to
- *    an object.
- *  - The creation of a [[Dataset]] requires the presence of an explicit [[Encoder]] that can be
- *    used to serialize the object into a binary format.  Encoders are also capable of mapping the
- *    schema of a given object to the Spark SQL type system.  In contrast, RDDs rely on runtime
- *    reflection based serialization. Operations that change the type of object stored in the
- *    dataset also need an encoder for the new type.
+ * Datasets are "lazy", i.e. computations are only triggered when an action is invoked. Internally,
+ * a Dataset represents a logical plan that describes the computation required to produce the data.
+ * When an action is invoked, Spark's query optimizer optimizes the logical plan and generates a
+ * physical plan for efficient execution in a parallel or distributed manner. To explore the
+ * logical plan as well as optimized physical plan, use the `explain` function.
  *
- * Different from DataFrame in Spark 1.6.0 and earlier versions, a [[Dataset]] can be thought of as
- * a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a
- * generic [[Row]] container.  Since Spark 2.0.0, DataFrame is simply a type alias of
- * `Dataset[Row]`.
+ * To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps
+ * the domain specific type T to Spark's internal type system. For example, given a class Person
+ * with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code
+ * at runtime to serialize the Person object into a binary structure. This binary structure often
+ * has much lower memory footprint as well as are optimized for efficiency in data processing
+ * (e.g. in a columnar format). To understand the internal binary representation for data, use the
+ * `schema` function.
  *
- * The following example creates a `Dataset[Row]` by pointing Spark SQL to a Parquet data set.
+ * There are typically two ways to create a Dataset. The most common way to by pointing Spark
+ * to some files on storage systems, using the `read` function available on a `SparkSession`.
  * {{{
- *   val people = sqlContext.read.parquet("...")  // in Scala
- *   Dataset<Row> people = sqlContext.read().parquet("...")  // in Java
+ *   val people = session.read.parquet("...").as[Person]  // Scala
+ *   Dataset<Person> people = session.read().parquet("...").as(Encoders.bean(Person.class)  // Java
  * }}}
  *
- * Once created, it can be manipulated using the various domain-specific-language (DSL) functions
- * defined in: [[Dataset]] (this class), [[Column]], and [[functions]].
+ * Datasets can also be created through transformations available on existing Datasets. For example,
+ * the following creates a new Dataset by applying a filter on the existing one:
+ * {{{
+ *   val names = people.map(_.name)  // in Scala; names is a Dataset[String]
+ *   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)  // in Java 8
+ * }}}
  *
- * To select a column from the data frame, use `apply` method in Scala and `col` in Java.
+ * Dataset operations can also be untyped, through the various domain-specific-language (DSL)
+ * functions defined in: [[Dataset]] (this class), [[Column]], and [[functions]]. These operations
+ * are very similar to the operations available in the data frame abstraction in R or Python.
+ *
+ * To select a column from the Dataset, use `apply` method in Scala and `col` in Java.
  * {{{
  *   val ageCol = people("age")  // in Scala
  *   Column ageCol = people.col("age")  // in Java
@@ -241,7 +253,6 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * :: Experimental ::
    * Converts this strongly typed collection of data to generic Dataframe.  In contrast to the
    * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
    * objects that allow fields to be accessed by ordinal or name.
@@ -251,7 +262,6 @@ class Dataset[T] private[sql](
    */
   // This is declared with parentheses to prevent the Scala compiler from treating
   // `ds.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
-  @Experimental
   def toDF(): DataFrame = new Dataset[Row](sqlContext, queryExecution, RowEncoder(schema))
 
   /**
@@ -1094,7 +1104,7 @@ class Dataset[T] private[sql](
   def cube(cols: Column*): GroupedData = GroupedData(toDF(), cols.map(_.expr), GroupedData.CubeType)
 
   /**
-   * Groups the [[Dataset]] 2.0.0
+   * Groups the [[Dataset]] using the specified columns, so we can run aggregation on them.
    * See [[GroupedData]] for all the available aggregate functions.
    *
    * This is a variant of groupBy that can only group by existing columns using column names
@@ -1314,7 +1324,8 @@ class Dataset[T] private[sql](
 
   /**
    * Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function
-   * and `head` is that `head` returns an array while `limit` returns a new [[Dataset]].
+   * and `head` is that `head` is an action and returns an array (by triggering query execution)
+   * while `limit` returns a new [[Dataset]].
    *
    * @group typedrel
    * @since 2.0.0
@@ -1327,6 +1338,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] containing union of rows in this frame and another frame.
    * This is equivalent to `UNION ALL` in SQL.
    *
+   * To do a SQL-style set union (that does deduplication of elements), use this function followed
+   * by a [[distinct]].
+   *
    * @group typedrel
    * @since 2.0.0
    */
@@ -1349,6 +1363,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] containing rows only in both this frame and another frame.
    * This is equivalent to `INTERSECT` in SQL.
    *
+   * Note that, equality checking is performed directly on the encoded representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   *
    * @group typedrel
    * @since 1.6.0
    */
@@ -1360,6 +1377,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] containing rows in this frame but not in another frame.
    * This is equivalent to `EXCEPT` in SQL.
    *
+   * Note that, equality checking is performed directly on the encoded representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   *
    * @group typedrel
    * @since 2.0.0
    */
@@ -1448,6 +1468,7 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
    * (Scala-specific) Returns a new [[Dataset]] where each row has been expanded to zero or more
    * rows by the provided function.  This is similar to a `LATERAL VIEW` in HiveQL. The columns of
    * the input row are implicitly joined with each row that is output by the function.
@@ -1470,6 +1491,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
+  @Experimental
   def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
     val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
 
@@ -1489,6 +1511,7 @@ class Dataset[T] private[sql](
   }
 
   /**
+   * :: Experimental ::
    * (Scala-specific) Returns a new [[Dataset]] where a single column has been expanded to zero
    * or more rows by the provided function.  This is similar to a `LATERAL VIEW` in HiveQL. All
    * columns of the input row are implicitly joined with each value that is output by the function.
@@ -1500,6 +1523,7 @@ class Dataset[T] private[sql](
    * @group untypedrel
    * @since 2.0.0
    */
+  @Experimental
   def explode[A, B : TypeTag](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])
     : DataFrame = {
     val dataType = ScalaReflection.schemaFor[B].dataType
@@ -1770,7 +1794,7 @@ class Dataset[T] private[sql](
   /**
    * Concise syntax for chaining custom transformations.
    * {{{
-   *   def featurize(ds: Dataset[T]) = ...
+   *   def featurize(ds: Dataset[T]): Dataset[U] = ...
    *
    *   ds
    *     .transform(featurize)
@@ -2051,6 +2075,9 @@ class Dataset[T] private[sql](
    * Returns a new [[Dataset]] that contains only the unique rows from this [[Dataset]].
    * This is an alias for `dropDuplicates`.
    *
+   * Note that, equality checking is performed directly on the encoded representation of the data
+   * and thus is not affected by a custom `equals` function defined on `T`.
+   *
    * @group typedrel
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index 1a5c6a6..102a935 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -23,9 +23,8 @@ import scala.concurrent.duration._
 import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode}
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.util.ThreadUtils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1fda01/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index 9eaadea..df7ad48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -30,7 +30,11 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 /**
- * An interface for exchanges.
+ * Base class for operators that exchange data among multiple threads or processes.
+ *
+ * Exchanges are the key class of operators that enable parallelism. Although the implementation
+ * differs significantly, the concept is similar to the exchange operator described in
+ * "Volcano -- An Extensible and Parallel Query Evaluation System" by Goetz Graefe.
  */
 abstract class Exchange extends UnaryNode {
   override def output: Seq[Attribute] = child.output


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org