You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/03 09:23:39 UTC

spark git commit: [SPARK-8060] Improve DataFrame Python test coverage and documentation.

Repository: spark
Updated Branches:
  refs/heads/master 452eb82dd -> ce320cb2d


[SPARK-8060] Improve DataFrame Python test coverage and documentation.

Author: Reynold Xin <rx...@databricks.com>

Closes #6601 from rxin/python-read-write-test-and-doc and squashes the following commits:

baa8ad5 [Reynold Xin] Code review feedback.
f081d47 [Reynold Xin] More documentation updates.
c9902fa [Reynold Xin] [SPARK-8060] Improve DataFrame Python reader/writer interface doc and testing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce320cb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce320cb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce320cb2

Branch: refs/heads/master
Commit: ce320cb2dbf28825f80795ce569735888f98d6e8
Parents: 452eb82
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Jun 3 00:23:34 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jun 3 00:23:34 2015 -0700

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 python/pyspark/sql/__init__.py                  |  13 +-
 python/pyspark/sql/context.py                   |  89 ++++----
 python/pyspark/sql/dataframe.py                 |  82 +++----
 python/pyspark/sql/readwriter.py                | 217 +++++++++----------
 python/pyspark/sql/tests.py                     |   2 +
 .../sql/parquet_partitioned/_SUCCESS            |   0
 .../sql/parquet_partitioned/_common_metadata    | Bin 0 -> 210 bytes
 .../sql/parquet_partitioned/_metadata           | Bin 0 -> 743 bytes
 .../month=9/day=1/.part-r-00008.gz.parquet.crc  | Bin 0 -> 12 bytes
 .../month=9/day=1/part-r-00008.gz.parquet       | Bin 0 -> 322 bytes
 .../day=25/.part-r-00002.gz.parquet.crc         | Bin 0 -> 12 bytes
 .../day=25/.part-r-00004.gz.parquet.crc         | Bin 0 -> 12 bytes
 .../month=10/day=25/part-r-00002.gz.parquet     | Bin 0 -> 343 bytes
 .../month=10/day=25/part-r-00004.gz.parquet     | Bin 0 -> 343 bytes
 .../day=26/.part-r-00005.gz.parquet.crc         | Bin 0 -> 12 bytes
 .../month=10/day=26/part-r-00005.gz.parquet     | Bin 0 -> 333 bytes
 .../month=9/day=1/.part-r-00007.gz.parquet.crc  | Bin 0 -> 12 bytes
 .../month=9/day=1/part-r-00007.gz.parquet       | Bin 0 -> 343 bytes
 python/test_support/sql/people.json             |   3 +
 20 files changed, 180 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index c0f81b5..8f2722c 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -82,3 +82,4 @@ local-1426633911242/*
 local-1430917381534/*
 DESCRIPTION
 NAMESPACE
+test_support/*

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/pyspark/sql/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 726d288..ad9c891 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -45,11 +45,20 @@ from __future__ import absolute_import
 
 
 def since(version):
+    """
+    A decorator that annotates a function to append the version of Spark the function was added.
+    """
+    import re
+    indent_p = re.compile(r'\n( +)')
+
     def deco(f):
-        f.__doc__ = f.__doc__.rstrip() + "\n\n.. versionadded:: %s" % version
+        indents = indent_p.findall(f.__doc__)
+        indent = ' ' * (min(len(m) for m in indents) if indents else 0)
+        f.__doc__ = f.__doc__.rstrip() + "\n\n%s.. versionadded:: %s" % (indent, version)
         return f
     return deco
 
+
 from pyspark.sql.types import Row
 from pyspark.sql.context import SQLContext, HiveContext
 from pyspark.sql.column import Column
@@ -58,7 +67,9 @@ from pyspark.sql.group import GroupedData
 from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
 from pyspark.sql.window import Window, WindowSpec
 
+
 __all__ = [
     'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',
     'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
+    'DataFrameReader', 'DataFrameWriter'
 ]

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 22f6257..9fdf43c 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -124,7 +124,10 @@ class SQLContext(object):
     @property
     @since("1.3.1")
     def udf(self):
-        """Returns a :class:`UDFRegistration` for UDF registration."""
+        """Returns a :class:`UDFRegistration` for UDF registration.
+
+        :return: :class:`UDFRegistration`
+        """
         return UDFRegistration(self)
 
     @since(1.4)
@@ -138,7 +141,7 @@ class SQLContext(object):
         :param end: the end value (exclusive)
         :param step: the incremental step (default: 1)
         :param numPartitions: the number of partitions of the DataFrame
-        :return: A new DataFrame
+        :return: :class:`DataFrame`
 
         >>> sqlContext.range(1, 7, 2).collect()
         [Row(id=1), Row(id=3), Row(id=5)]
@@ -195,8 +198,8 @@ class SQLContext(object):
             raise ValueError("The first row in RDD is empty, "
                              "can not infer schema")
         if type(first) is dict:
-            warnings.warn("Using RDD of dict to inferSchema is deprecated,"
-                          "please use pyspark.sql.Row instead")
+            warnings.warn("Using RDD of dict to inferSchema is deprecated. "
+                          "Use pyspark.sql.Row instead")
 
         if samplingRatio is None:
             schema = _infer_schema(first)
@@ -219,7 +222,7 @@ class SQLContext(object):
         """
         .. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
         """
-        warnings.warn("inferSchema is deprecated, please use createDataFrame instead")
+        warnings.warn("inferSchema is deprecated, please use createDataFrame instead.")
 
         if isinstance(rdd, DataFrame):
             raise TypeError("Cannot apply schema to DataFrame")
@@ -262,6 +265,7 @@ class SQLContext(object):
             :class:`list`, or :class:`pandas.DataFrame`.
         :param schema: a :class:`StructType` or list of column names. default None.
         :param samplingRatio: the sample ratio of rows used for inferring
+        :return: :class:`DataFrame`
 
         >>> l = [('Alice', 1)]
         >>> sqlContext.createDataFrame(l).collect()
@@ -359,18 +363,15 @@ class SQLContext(object):
         else:
             raise ValueError("Can only register DataFrame as table")
 
-    @since(1.0)
     def parquetFile(self, *paths):
         """Loads a Parquet file, returning the result as a :class:`DataFrame`.
 
-        >>> import tempfile, shutil
-        >>> parquetFile = tempfile.mkdtemp()
-        >>> shutil.rmtree(parquetFile)
-        >>> df.saveAsParquetFile(parquetFile)
-        >>> df2 = sqlContext.parquetFile(parquetFile)
-        >>> sorted(df.collect()) == sorted(df2.collect())
-        True
+        .. note:: Deprecated in 1.4, use :func:`DataFrameReader.parquet` instead.
+
+        >>> sqlContext.parquetFile('python/test_support/sql/parquet_partitioned').dtypes
+        [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
         """
+        warnings.warn("parquetFile is deprecated. Use read.parquet() instead.")
         gateway = self._sc._gateway
         jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
         for i in range(0, len(paths)):
@@ -378,39 +379,15 @@ class SQLContext(object):
         jdf = self._ssql_ctx.parquetFile(jpaths)
         return DataFrame(jdf, self)
 
-    @since(1.0)
     def jsonFile(self, path, schema=None, samplingRatio=1.0):
         """Loads a text file storing one JSON object per line as a :class:`DataFrame`.
 
-        If the schema is provided, applies the given schema to this JSON dataset.
-        Otherwise, it samples the dataset with ratio ``samplingRatio`` to determine the schema.
-
-        >>> import tempfile, shutil
-        >>> jsonFile = tempfile.mkdtemp()
-        >>> shutil.rmtree(jsonFile)
-        >>> with open(jsonFile, 'w') as f:
-        ...     f.writelines(jsonStrings)
-        >>> df1 = sqlContext.jsonFile(jsonFile)
-        >>> df1.printSchema()
-        root
-         |-- field1: long (nullable = true)
-         |-- field2: string (nullable = true)
-         |-- field3: struct (nullable = true)
-         |    |-- field4: long (nullable = true)
+        .. note:: Deprecated in 1.4, use :func:`DataFrameReader.json` instead.
 
-        >>> from pyspark.sql.types import *
-        >>> schema = StructType([
-        ...     StructField("field2", StringType()),
-        ...     StructField("field3",
-        ...         StructType([StructField("field5", ArrayType(IntegerType()))]))])
-        >>> df2 = sqlContext.jsonFile(jsonFile, schema)
-        >>> df2.printSchema()
-        root
-         |-- field2: string (nullable = true)
-         |-- field3: struct (nullable = true)
-         |    |-- field5: array (nullable = true)
-         |    |    |-- element: integer (containsNull = true)
+        >>> sqlContext.jsonFile('python/test_support/sql/people.json').dtypes
+        [('age', 'bigint'), ('name', 'string')]
         """
+        warnings.warn("jsonFile is deprecated. Use read.json() instead.")
         if schema is None:
             df = self._ssql_ctx.jsonFile(path, samplingRatio)
         else:
@@ -462,21 +439,16 @@ class SQLContext(object):
             df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
         return DataFrame(df, self)
 
-    @since(1.3)
     def load(self, path=None, source=None, schema=None, **options):
         """Returns the dataset in a data source as a :class:`DataFrame`.
 
-        The data source is specified by the ``source`` and a set of ``options``.
-        If ``source`` is not specified, the default data source configured by
-        ``spark.sql.sources.default`` will be used.
-
-        Optionally, a schema can be provided as the schema of the returned DataFrame.
+        .. note:: Deprecated in 1.4, use :func:`DataFrameReader.load` instead.
         """
+        warnings.warn("load is deprecated. Use read.load() instead.")
         return self.read.load(path, source, schema, **options)
 
     @since(1.3)
-    def createExternalTable(self, tableName, path=None, source=None,
-                            schema=None, **options):
+    def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
         """Creates an external table based on the dataset in a data source.
 
         It returns the DataFrame associated with the external table.
@@ -487,6 +459,8 @@ class SQLContext(object):
 
         Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
         created external table.
+
+        :return: :class:`DataFrame`
         """
         if path is not None:
             options["path"] = path
@@ -508,6 +482,8 @@ class SQLContext(object):
     def sql(self, sqlQuery):
         """Returns a :class:`DataFrame` representing the result of the given query.
 
+        :return: :class:`DataFrame`
+
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
         >>> df2.collect()
@@ -519,6 +495,8 @@ class SQLContext(object):
     def table(self, tableName):
         """Returns the specified table as a :class:`DataFrame`.
 
+        :return: :class:`DataFrame`
+
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlContext.table("table1")
         >>> sorted(df.collect()) == sorted(df2.collect())
@@ -536,6 +514,9 @@ class SQLContext(object):
         The returned DataFrame has two columns: ``tableName`` and ``isTemporary``
         (a column with :class:`BooleanType` indicating if a table is a temporary one or not).
 
+        :param dbName: string, name of the database to use.
+        :return: :class:`DataFrame`
+
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlContext.tables()
         >>> df2.filter("tableName = 'table1'").first()
@@ -550,7 +531,8 @@ class SQLContext(object):
     def tableNames(self, dbName=None):
         """Returns a list of names of tables in the database ``dbName``.
 
-        If ``dbName`` is not specified, the current database will be used.
+        :param dbName: string, name of the database to use. Default to the current database.
+        :return: list of table names, in string
 
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> "table1" in sqlContext.tableNames()
@@ -585,8 +567,7 @@ class SQLContext(object):
         Returns a :class:`DataFrameReader` that can be used to read data
         in as a :class:`DataFrame`.
 
-        >>> sqlContext.read
-        <pyspark.sql.readwriter.DataFrameReader object at ...>
+        :return: :class:`DataFrameReader`
         """
         return DataFrameReader(self)
 
@@ -644,10 +625,14 @@ class UDFRegistration(object):
 
 
 def _test():
+    import os
     import doctest
     from pyspark.context import SparkContext
     from pyspark.sql import Row, SQLContext
     import pyspark.sql.context
+
+    os.chdir(os.environ["SPARK_HOME"])
+
     globs = pyspark.sql.context.__dict__.copy()
     sc = SparkContext('local[4]', 'PythonTest')
     globs['sc'] = sc

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a82b6b8..7673153 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -44,7 +44,7 @@ class DataFrame(object):
     A :class:`DataFrame` is equivalent to a relational table in Spark SQL,
     and can be created using various functions in :class:`SQLContext`::
 
-        people = sqlContext.parquetFile("...")
+        people = sqlContext.read.parquet("...")
 
     Once created, it can be manipulated using the various domain-specific-language
     (DSL) functions defined in: :class:`DataFrame`, :class:`Column`.
@@ -56,8 +56,8 @@ class DataFrame(object):
     A more concrete example::
 
         # To create DataFrame using SQLContext
-        people = sqlContext.parquetFile("...")
-        department = sqlContext.parquetFile("...")
+        people = sqlContext.read.parquet("...")
+        department = sqlContext.read.parquet("...")
 
         people.filter(people.age > 30).join(department, people.deptId == department.id)) \
           .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
@@ -120,21 +120,12 @@ class DataFrame(object):
         rdd = self._jdf.toJSON()
         return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))
 
-    @since(1.3)
     def saveAsParquetFile(self, path):
         """Saves the contents as a Parquet file, preserving the schema.
 
-        Files that are written out using this method can be read back in as
-        a :class:`DataFrame` using :func:`SQLContext.parquetFile`.
-
-        >>> import tempfile, shutil
-        >>> parquetFile = tempfile.mkdtemp()
-        >>> shutil.rmtree(parquetFile)
-        >>> df.saveAsParquetFile(parquetFile)
-        >>> df2 = sqlContext.parquetFile(parquetFile)
-        >>> sorted(df2.collect()) == sorted(df.collect())
-        True
+        .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.parquet` instead.
         """
+        warnings.warn("saveAsParquetFile is deprecated. Use write.parquet() instead.")
         self._jdf.saveAsParquetFile(path)
 
     @since(1.3)
@@ -151,69 +142,45 @@ class DataFrame(object):
         """
         self._jdf.registerTempTable(name)
 
-    @since(1.3)
     def registerAsTable(self, name):
-        """DEPRECATED: use :func:`registerTempTable` instead"""
-        warnings.warn("Use registerTempTable instead of registerAsTable.", DeprecationWarning)
+        """
+        .. note:: Deprecated in 1.4, use :func:`registerTempTable` instead.
+        """
+        warnings.warn("Use registerTempTable instead of registerAsTable.")
         self.registerTempTable(name)
 
-    @since(1.3)
     def insertInto(self, tableName, overwrite=False):
         """Inserts the contents of this :class:`DataFrame` into the specified table.
 
-        Optionally overwriting any existing data.
+        .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.insertInto` instead.
         """
+        warnings.warn("insertInto is deprecated. Use write.insertInto() instead.")
         self.write.insertInto(tableName, overwrite)
 
-    @since(1.3)
     def saveAsTable(self, tableName, source=None, mode="error", **options):
         """Saves the contents of this :class:`DataFrame` to a data source as a table.
 
-        The data source is specified by the ``source`` and a set of ``options``.
-        If ``source`` is not specified, the default data source configured by
-        ``spark.sql.sources.default`` will be used.
-
-        Additionally, mode is used to specify the behavior of the saveAsTable operation when
-        table already exists in the data source. There are four modes:
-
-        * `append`: Append contents of this :class:`DataFrame` to existing data.
-        * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
-        * `ignore`: Silently ignore this operation if data already exists.
+        .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.saveAsTable` instead.
         """
+        warnings.warn("insertInto is deprecated. Use write.saveAsTable() instead.")
         self.write.saveAsTable(tableName, source, mode, **options)
 
     @since(1.3)
     def save(self, path=None, source=None, mode="error", **options):
         """Saves the contents of the :class:`DataFrame` to a data source.
 
-        The data source is specified by the ``source`` and a set of ``options``.
-        If ``source`` is not specified, the default data source configured by
-        ``spark.sql.sources.default`` will be used.
-
-        Additionally, mode is used to specify the behavior of the save operation when
-        data already exists in the data source. There are four modes:
-
-        * `append`: Append contents of this :class:`DataFrame` to existing data.
-        * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
-        * `ignore`: Silently ignore this operation if data already exists.
+        .. note:: Deprecated in 1.4, use :func:`DataFrameWriter.save` instead.
         """
+        warnings.warn("insertInto is deprecated. Use write.save() instead.")
         return self.write.save(path, source, mode, **options)
 
     @property
     @since(1.4)
     def write(self):
         """
-        Interface for saving the content of the :class:`DataFrame` out
-        into external storage.
-
-        :return :class:`DataFrameWriter`
+        Interface for saving the content of the :class:`DataFrame` out into external storage.
 
-        .. note:: Experimental
-
-        >>> df.write
-        <pyspark.sql.readwriter.DataFrameWriter object at ...>
+        :return: :class:`DataFrameWriter`
         """
         return DataFrameWriter(self)
 
@@ -636,6 +603,9 @@ class DataFrame(object):
         This include count, mean, stddev, min, and max. If no columns are
         given, this function computes statistics for all numerical columns.
 
+        .. note:: This function is meant for exploratory data analysis, as we make no \
+        guarantee about the backward compatibility of the schema of the resulting DataFrame.
+
         >>> df.describe().show()
         +-------+---+
         |summary|age|
@@ -653,9 +623,11 @@ class DataFrame(object):
     @ignore_unicode_prefix
     @since(1.3)
     def head(self, n=None):
-        """
-        Returns the first ``n`` rows as a list of :class:`Row`,
-        or the first :class:`Row` if ``n`` is ``None.``
+        """Returns the first ``n`` rows.
+
+        :param n: int, default 1. Number of rows to return.
+        :return: If n is greater than 1, return a list of :class:`Row`.
+            If n is 1, return a single Row.
 
         >>> df.head()
         Row(age=2, name=u'Alice')
@@ -1170,8 +1142,8 @@ class DataFrame(object):
         "http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou".
         :func:`DataFrame.freqItems` and :func:`DataFrameStatFunctions.freqItems` are aliases.
 
-        This function is meant for exploratory data analysis, as we make no guarantee about the
-        backward compatibility of the schema of the resulting DataFrame.
+        .. note::  This function is meant for exploratory data analysis, as we make no \
+        guarantee about the backward compatibility of the schema of the resulting DataFrame.
 
         :param cols: Names of the columns to calculate frequent items for as a list or tuple of
             strings.

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index d17d874..f036644 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -45,18 +45,24 @@ class DataFrameReader(object):
 
     @since(1.4)
     def format(self, source):
-        """
-        Specifies the input data source format.
+        """Specifies the input data source format.
+
+        :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+        >>> df = sqlContext.read.format('json').load('python/test_support/sql/people.json')
+        >>> df.dtypes
+        [('age', 'bigint'), ('name', 'string')]
+
         """
         self._jreader = self._jreader.format(source)
         return self
 
     @since(1.4)
     def schema(self, schema):
-        """
-        Specifies the input schema. Some data sources (e.g. JSON) can
-        infer the input schema automatically from data. By specifying
-        the schema here, the underlying data source can skip the schema
+        """Specifies the input schema.
+
+        Some data sources (e.g. JSON) can infer the input schema automatically from data.
+        By specifying the schema here, the underlying data source can skip the schema
         inference step, and thus speed up data loading.
 
         :param schema: a StructType object
@@ -69,8 +75,7 @@ class DataFrameReader(object):
 
     @since(1.4)
     def options(self, **options):
-        """
-        Adds input options for the underlying data source.
+        """Adds input options for the underlying data source.
         """
         for k in options:
             self._jreader = self._jreader.option(k, options[k])
@@ -84,6 +89,10 @@ class DataFrameReader(object):
         :param format: optional string for format of the data source. Default to 'parquet'.
         :param schema: optional :class:`StructType` for the input schema.
         :param options: all other string options
+
+        >>> df = sqlContext.read.load('python/test_support/sql/parquet_partitioned')
+        >>> df.dtypes
+        [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
         """
         if format is not None:
             self.format(format)
@@ -107,31 +116,10 @@ class DataFrameReader(object):
         :param path: string, path to the JSON dataset.
         :param schema: an optional :class:`StructType` for the input schema.
 
-        >>> import tempfile, shutil
-        >>> jsonFile = tempfile.mkdtemp()
-        >>> shutil.rmtree(jsonFile)
-        >>> with open(jsonFile, 'w') as f:
-        ...     f.writelines(jsonStrings)
-        >>> df1 = sqlContext.read.json(jsonFile)
-        >>> df1.printSchema()
-        root
-         |-- field1: long (nullable = true)
-         |-- field2: string (nullable = true)
-         |-- field3: struct (nullable = true)
-         |    |-- field4: long (nullable = true)
-
-        >>> from pyspark.sql.types import *
-        >>> schema = StructType([
-        ...     StructField("field2", StringType()),
-        ...     StructField("field3",
-        ...         StructType([StructField("field5", ArrayType(IntegerType()))]))])
-        >>> df2 = sqlContext.read.json(jsonFile, schema)
-        >>> df2.printSchema()
-        root
-         |-- field2: string (nullable = true)
-         |-- field3: struct (nullable = true)
-         |    |-- field5: array (nullable = true)
-         |    |    |-- element: integer (containsNull = true)
+        >>> df = sqlContext.read.json('python/test_support/sql/people.json')
+        >>> df.dtypes
+        [('age', 'bigint'), ('name', 'string')]
+
         """
         if schema is not None:
             self.schema(schema)
@@ -141,10 +129,12 @@ class DataFrameReader(object):
     def table(self, tableName):
         """Returns the specified table as a :class:`DataFrame`.
 
-        >>> sqlContext.registerDataFrameAsTable(df, "table1")
-        >>> df2 = sqlContext.read.table("table1")
-        >>> sorted(df.collect()) == sorted(df2.collect())
-        True
+        :param tableName: string, name of the table.
+
+        >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
+        >>> df.registerTempTable('tmpTable')
+        >>> sqlContext.read.table('tmpTable').dtypes
+        [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
         """
         return self._df(self._jreader.table(tableName))
 
@@ -152,13 +142,9 @@ class DataFrameReader(object):
     def parquet(self, *path):
         """Loads a Parquet file, returning the result as a :class:`DataFrame`.
 
-        >>> import tempfile, shutil
-        >>> parquetFile = tempfile.mkdtemp()
-        >>> shutil.rmtree(parquetFile)
-        >>> df.saveAsParquetFile(parquetFile)
-        >>> df2 = sqlContext.read.parquet(parquetFile)
-        >>> sorted(df.collect()) == sorted(df2.collect())
-        True
+        >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
+        >>> df.dtypes
+        [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
         """
         return self._df(self._jreader.parquet(_to_seq(self._sqlContext._sc, path)))
 
@@ -221,30 +207,34 @@ class DataFrameWriter(object):
 
     @since(1.4)
     def mode(self, saveMode):
-        """
-        Specifies the behavior when data or table already exists. Options include:
+        """Specifies the behavior when data or table already exists.
+
+        Options include:
 
         * `append`: Append contents of this :class:`DataFrame` to existing data.
         * `overwrite`: Overwrite existing data.
         * `error`: Throw an exception if data already exists.
         * `ignore`: Silently ignore this operation if data already exists.
+
+        >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self._jwrite = self._jwrite.mode(saveMode)
         return self
 
     @since(1.4)
     def format(self, source):
-        """
-        Specifies the underlying output data source. Built-in options include
-        "parquet", "json", etc.
+        """Specifies the underlying output data source.
+
+        :param source: string, name of the data source, e.g. 'json', 'parquet'.
+
+        >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self._jwrite = self._jwrite.format(source)
         return self
 
     @since(1.4)
     def options(self, **options):
-        """
-        Adds output options for the underlying data source.
+        """Adds output options for the underlying data source.
         """
         for k in options:
             self._jwrite = self._jwrite.option(k, options[k])
@@ -252,12 +242,14 @@ class DataFrameWriter(object):
 
     @since(1.4)
     def partitionBy(self, *cols):
-        """
-        Partitions the output by the given columns on the file system.
+        """Partitions the output by the given columns on the file system.
+
         If specified, the output is laid out on the file system similar
         to Hive's partitioning scheme.
 
         :param cols: name of columns
+
+        >>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         if len(cols) == 1 and isinstance(cols[0], (list, tuple)):
             cols = cols[0]
@@ -266,25 +258,23 @@ class DataFrameWriter(object):
 
     @since(1.4)
     def save(self, path=None, format=None, mode="error", **options):
-        """
-        Saves the contents of the :class:`DataFrame` to a data source.
+        """Saves the contents of the :class:`DataFrame` to a data source.
 
         The data source is specified by the ``format`` and a set of ``options``.
         If ``format`` is not specified, the default data source configured by
         ``spark.sql.sources.default`` will be used.
 
-        Additionally, mode is used to specify the behavior of the save operation when
-        data already exists in the data source. There are four modes:
-
-        * `append`: Append contents of this :class:`DataFrame` to existing data.
-        * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
-        * `ignore`: Silently ignore this operation if data already exists.
-
         :param path: the path in a Hadoop supported file system
         :param format: the format used to save
-        :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+        :param mode: specifies the behavior of the save operation when data already exists.
+
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
         :param options: all other string options
+
+        >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
         """
         self.mode(mode).options(**options)
         if format is not None:
@@ -296,8 +286,8 @@ class DataFrameWriter(object):
 
     @since(1.4)
     def insertInto(self, tableName, overwrite=False):
-        """
-        Inserts the content of the :class:`DataFrame` to the specified table.
+        """Inserts the content of the :class:`DataFrame` to the specified table.
+
         It requires that the schema of the class:`DataFrame` is the same as the
         schema of the table.
 
@@ -307,8 +297,7 @@ class DataFrameWriter(object):
 
     @since(1.4)
     def saveAsTable(self, name, format=None, mode="error", **options):
-        """
-        Saves the content of the :class:`DataFrame` as the specified table.
+        """Saves the content of the :class:`DataFrame` as the specified table.
 
         In the case the table already exists, behavior of this function depends on the
         save mode, specified by the `mode` function (default to throwing an exception).
@@ -328,67 +317,58 @@ class DataFrameWriter(object):
         self.mode(mode).options(**options)
         if format is not None:
             self.format(format)
-        return self._jwrite.saveAsTable(name)
+        self._jwrite.saveAsTable(name)
 
     @since(1.4)
     def json(self, path, mode="error"):
-        """
-        Saves the content of the :class:`DataFrame` in JSON format at the
-        specified path.
+        """Saves the content of the :class:`DataFrame` in JSON format at the specified path.
 
-        Additionally, mode is used to specify the behavior of the save operation when
-        data already exists in the data source. There are four modes:
+        :param path: the path in any Hadoop supported file system
+        :param mode: specifies the behavior of the save operation when data already exists.
 
-        * `append`: Append contents of this :class:`DataFrame` to existing data.
-        * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
-        * `ignore`: Silently ignore this operation if data already exists.
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
 
-        :param path: the path in any Hadoop supported file system
-        :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+        >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
         """
-        return self._jwrite.mode(mode).json(path)
+        self._jwrite.mode(mode).json(path)
 
     @since(1.4)
     def parquet(self, path, mode="error"):
-        """
-        Saves the content of the :class:`DataFrame` in Parquet format at the
-        specified path.
+        """Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
 
-        Additionally, mode is used to specify the behavior of the save operation when
-        data already exists in the data source. There are four modes:
+        :param path: the path in any Hadoop supported file system
+        :param mode: specifies the behavior of the save operation when data already exists.
 
-        * `append`: Append contents of this :class:`DataFrame` to existing data.
-        * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
-        * `ignore`: Silently ignore this operation if data already exists.
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
 
-        :param path: the path in any Hadoop supported file system
-        :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+        >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
         """
-        return self._jwrite.mode(mode).parquet(path)
+        self._jwrite.mode(mode).parquet(path)
 
     @since(1.4)
     def jdbc(self, url, table, mode="error", properties={}):
-        """
-        Saves the content of the :class:`DataFrame` to a external database table
-        via JDBC.
-
-        In the case the table already exists in the external database,
-        behavior of this function depends on the save mode, specified by the `mode`
-        function (default to throwing an exception). There are four modes:
+        """Saves the content of the :class:`DataFrame` to a external database table via JDBC.
 
-        * `append`: Append contents of this :class:`DataFrame` to existing data.
-        * `overwrite`: Overwrite existing data.
-        * `error`: Throw an exception if data already exists.
-        * `ignore`: Silently ignore this operation if data already exists.
+        .. note:: Don't create too many partitions in parallel on a large cluster;\
+        otherwise Spark might crash your external database systems.
 
-        :param url: a JDBC URL of the form `jdbc:subprotocol:subname`
+        :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
         :param table: Name of the table in the external database.
-        :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
+        :param mode: specifies the behavior of the save operation when data already exists.
+
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
         :param properties: JDBC database connection arguments, a list of
-                                    arbitrary string tag/value. Normally at least a
-                                    "user" and "password" property should be included.
+                           arbitrary string tag/value. Normally at least a
+                           "user" and "password" property should be included.
         """
         jprop = JavaClass("java.util.Properties", self._sqlContext._sc._gateway._gateway_client)()
         for k in properties:
@@ -398,24 +378,23 @@ class DataFrameWriter(object):
 
 def _test():
     import doctest
+    import os
+    import tempfile
     from pyspark.context import SparkContext
     from pyspark.sql import Row, SQLContext
     import pyspark.sql.readwriter
+
+    os.chdir(os.environ["SPARK_HOME"])
+
     globs = pyspark.sql.readwriter.__dict__.copy()
     sc = SparkContext('local[4]', 'PythonTest')
+
+    globs['tempfile'] = tempfile
+    globs['os'] = os
     globs['sc'] = sc
     globs['sqlContext'] = SQLContext(sc)
-    globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
-        .toDF(StructType([StructField('age', IntegerType()),
-                          StructField('name', StringType())]))
-    jsonStrings = [
-        '{"field1": 1, "field2": "row1", "field3":{"field4":11}}',
-        '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
-        '"field6":[{"field7": "row2"}]}',
-        '{"field1" : null, "field2": "row3", '
-        '"field3":{"field4":33, "field5": []}}'
-    ]
-    globs['jsonStrings'] = jsonStrings
+    globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
+
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.readwriter, globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 76384d3..6e498f0 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -753,8 +753,10 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
         try:
             cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
         except py4j.protocol.Py4JError:
+            cls.tearDownClass()
             raise unittest.SkipTest("Hive is not available")
         except TypeError:
+            cls.tearDownClass()
             raise unittest.SkipTest("Hive is not available")
         os.unlink(cls.tempdir.name)
         _scala_HiveContext =\

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/_SUCCESS
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/_SUCCESS b/python/test_support/sql/parquet_partitioned/_SUCCESS
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/_common_metadata
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/_common_metadata b/python/test_support/sql/parquet_partitioned/_common_metadata
new file mode 100644
index 0000000..7ef2320
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/_common_metadata differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/_metadata
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/_metadata b/python/test_support/sql/parquet_partitioned/_metadata
new file mode 100644
index 0000000..78a1ca7
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/_metadata differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc
new file mode 100644
index 0000000..e93f42e
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/.part-r-00008.gz.parquet.crc differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet
new file mode 100644
index 0000000..461c382
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2014/month=9/day=1/part-r-00008.gz.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc
new file mode 100644
index 0000000..b63c4d6
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00002.gz.parquet.crc differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc
new file mode 100644
index 0000000..5bc0ebd
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/.part-r-00004.gz.parquet.crc differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet
new file mode 100644
index 0000000..62a6391
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00002.gz.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet
new file mode 100644
index 0000000..67665a7
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=25/part-r-00004.gz.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc
new file mode 100644
index 0000000..ae94a15
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/.part-r-00005.gz.parquet.crc differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet
new file mode 100644
index 0000000..6cb8538
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=10/day=26/part-r-00005.gz.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc b/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc
new file mode 100644
index 0000000..58d9bb5
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/.part-r-00007.gz.parquet.crc differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet
----------------------------------------------------------------------
diff --git a/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet b/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet
new file mode 100644
index 0000000..9b00805
Binary files /dev/null and b/python/test_support/sql/parquet_partitioned/year=2015/month=9/day=1/part-r-00007.gz.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/ce320cb2/python/test_support/sql/people.json
----------------------------------------------------------------------
diff --git a/python/test_support/sql/people.json b/python/test_support/sql/people.json
new file mode 100644
index 0000000..50a859c
--- /dev/null
+++ b/python/test_support/sql/people.json
@@ -0,0 +1,3 @@
+{"name":"Michael"}
+{"name":"Andy", "age":30}
+{"name":"Justin", "age":19}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org