You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/23 17:30:13 UTC

spark git commit: [SPARK-7322, SPARK-7836, SPARK-7822][SQL] DataFrame window function related updates

Repository: spark
Updated Branches:
  refs/heads/master ad0badba1 -> efe3bfdf4


[SPARK-7322, SPARK-7836, SPARK-7822][SQL] DataFrame window function related updates

1. ntile should take an integer as parameter.
2. Added Python API (based on #6364)
3. Update documentation of various DataFrame Python functions.

Author: Davies Liu <da...@databricks.com>
Author: Reynold Xin <rx...@databricks.com>

Closes #6374 from rxin/window-final and squashes the following commits:

69004c7 [Reynold Xin] Style fix.
288cea9 [Reynold Xin] Update documentaiton.
7cb8985 [Reynold Xin] Merge pull request #6364 from davies/window
66092b4 [Davies Liu] update docs
ed73cb4 [Reynold Xin] [SPARK-7322][SQL] Improve DataFrame window function documentation.
ef55132 [Davies Liu] Merge branch 'master' of github.com:apache/spark into window4
8936ade [Davies Liu] fix maxint in python 3
2649358 [Davies Liu] update docs
778e2c0 [Davies Liu] SPARK-7836 and SPARK-7822: Python API of window functions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efe3bfdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efe3bfdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efe3bfdf

Branch: refs/heads/master
Commit: efe3bfdf496aa6206ace2697e31dd4c0c3c824fb
Parents: ad0badb
Author: Davies Liu <da...@databricks.com>
Authored: Sat May 23 08:30:05 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat May 23 08:30:05 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/__init__.py                  |  25 +--
 python/pyspark/sql/column.py                    |  54 +++--
 python/pyspark/sql/context.py                   |   2 -
 python/pyspark/sql/dataframe.py                 |   2 +
 python/pyspark/sql/functions.py                 | 147 +++++++++++---
 python/pyspark/sql/group.py                     |   2 +
 python/pyspark/sql/tests.py                     |  31 ++-
 python/pyspark/sql/window.py                    | 158 +++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala  | 197 +++++++++----------
 .../sql/hive/HiveDataFrameWindowSuite.scala     |  20 +-
 10 files changed, 464 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 66b0bff..8fee92a 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -18,26 +18,28 @@
 """
 Important classes of Spark SQL and DataFrames:
 
-    - L{SQLContext}
+    - :class:`pyspark.sql.SQLContext`
       Main entry point for :class:`DataFrame` and SQL functionality.
-    - L{DataFrame}
+    - :class:`pyspark.sql.DataFrame`
       A distributed collection of data grouped into named columns.
-    - L{Column}
+    - :class:`pyspark.sql.Column`
       A column expression in a :class:`DataFrame`.
-    - L{Row}
+    - :class:`pyspark.sql.Row`
       A row of data in a :class:`DataFrame`.
-    - L{HiveContext}
+    - :class:`pyspark.sql.HiveContext`
       Main entry point for accessing data stored in Apache Hive.
-    - L{GroupedData}
+    - :class:`pyspark.sql.GroupedData`
       Aggregation methods, returned by :func:`DataFrame.groupBy`.
-    - L{DataFrameNaFunctions}
+    - :class:`pyspark.sql.DataFrameNaFunctions`
       Methods for handling missing data (null values).
-    - L{DataFrameStatFunctions}
+    - :class:`pyspark.sql.DataFrameStatFunctions`
       Methods for statistics functionality.
-    - L{functions}
+    - :class:`pyspark.sql.functions`
       List of built-in functions available for :class:`DataFrame`.
-    - L{types}
+    - :class:`pyspark.sql.types`
       List of data types available.
+    - :class:`pyspark.sql.Window`
+      For working with window functions.
 """
 from __future__ import absolute_import
 
@@ -66,8 +68,9 @@ from pyspark.sql.column import Column
 from pyspark.sql.dataframe import DataFrame, SchemaRDD, DataFrameNaFunctions, DataFrameStatFunctions
 from pyspark.sql.group import GroupedData
 from pyspark.sql.readwriter import DataFrameReader, DataFrameWriter
+from pyspark.sql.window import Window, WindowSpec
 
 __all__ = [
     'SQLContext', 'HiveContext', 'DataFrame', 'GroupedData', 'Column', 'Row',
-    'DataFrameNaFunctions', 'DataFrameStatFunctions'
+    'DataFrameNaFunctions', 'DataFrameStatFunctions', 'Window', 'WindowSpec',
 ]

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/column.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index baf1ecb..8dc5039 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -116,6 +116,8 @@ class Column(object):
         df.colName + 1
         1 / df.colName
 
+    .. note:: Experimental
+
     .. versionadded:: 1.3
     """
 
@@ -164,8 +166,9 @@ class Column(object):
 
     @since(1.3)
     def getItem(self, key):
-        """An expression that gets an item at position `ordinal` out of a list,
-         or gets an item by key out of a dict.
+        """
+        An expression that gets an item at position ``ordinal`` out of a list,
+        or gets an item by key out of a dict.
 
         >>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
         >>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
@@ -185,7 +188,8 @@ class Column(object):
 
     @since(1.3)
     def getField(self, name):
-        """An expression that gets a field by name in a StructField.
+        """
+        An expression that gets a field by name in a StructField.
 
         >>> from pyspark.sql import Row
         >>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
@@ -219,7 +223,7 @@ class Column(object):
     @since(1.3)
     def substr(self, startPos, length):
         """
-        Return a :class:`Column` which is a substring of the column
+        Return a :class:`Column` which is a substring of the column.
 
         :param startPos: start position (int or Column)
         :param length:  length of the substring (int or Column)
@@ -242,7 +246,8 @@ class Column(object):
     @ignore_unicode_prefix
     @since(1.3)
     def inSet(self, *cols):
-        """ A boolean expression that is evaluated to true if the value of this
+        """
+        A boolean expression that is evaluated to true if the value of this
         expression is contained by the evaluated values of the arguments.
 
         >>> df[df.name.inSet("Bob", "Mike")].collect()
@@ -268,7 +273,8 @@ class Column(object):
 
     @since(1.3)
     def alias(self, *alias):
-        """Returns this column aliased with a new name or names (in the case of expressions that
+        """
+        Returns this column aliased with a new name or names (in the case of expressions that
         return more than one column, such as explode).
 
         >>> df.select(df.age.alias("age2")).collect()
@@ -284,7 +290,7 @@ class Column(object):
     @ignore_unicode_prefix
     @since(1.3)
     def cast(self, dataType):
-        """ Convert the column into type `dataType`
+        """ Convert the column into type ``dataType``.
 
         >>> df.select(df.age.cast("string").alias('ages')).collect()
         [Row(ages=u'2'), Row(ages=u'5')]
@@ -304,25 +310,24 @@ class Column(object):
 
     astype = cast
 
-    @ignore_unicode_prefix
     @since(1.3)
     def between(self, lowerBound, upperBound):
-        """ A boolean expression that is evaluated to true if the value of this
+        """
+        A boolean expression that is evaluated to true if the value of this
         expression is between the given columns.
         """
         return (self >= lowerBound) & (self <= upperBound)
 
-    @ignore_unicode_prefix
     @since(1.4)
     def when(self, condition, value):
-        """Evaluates a list of conditions and returns one of multiple possible result expressions.
+        """
+        Evaluates a list of conditions and returns one of multiple possible result expressions.
         If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
 
         See :func:`pyspark.sql.functions.when` for example usage.
 
         :param condition: a boolean :class:`Column` expression.
         :param value: a literal value, or a :class:`Column` expression.
-
         """
         sc = SparkContext._active_spark_context
         if not isinstance(condition, Column):
@@ -331,10 +336,10 @@ class Column(object):
         jc = sc._jvm.functions.when(condition._jc, v)
         return Column(jc)
 
-    @ignore_unicode_prefix
     @since(1.4)
     def otherwise(self, value):
-        """Evaluates a list of conditions and returns one of multiple possible result expressions.
+        """
+        Evaluates a list of conditions and returns one of multiple possible result expressions.
         If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
 
         See :func:`pyspark.sql.functions.when` for example usage.
@@ -345,6 +350,27 @@ class Column(object):
         jc = self._jc.otherwise(value)
         return Column(jc)
 
+    @since(1.4)
+    def over(self, window):
+        """
+        Define a windowing column.
+
+        :param window: a :class:`WindowSpec`
+        :return: a Column
+
+        >>> from pyspark.sql import Window
+        >>> window = Window.partitionBy("name").orderBy("age").rowsBetween(-1, 1)
+        >>> from pyspark.sql.functions import rank, min
+        >>> # df.select(rank().over(window), min('age').over(window))
+
+        .. note:: Window functions is only supported with HiveContext in 1.4
+        """
+        from pyspark.sql.window import WindowSpec
+        if not isinstance(window, WindowSpec):
+            raise TypeError("window should be WindowSpec")
+        jc = self._jc.over(window._jspec)
+        return Column(jc)
+
     def __repr__(self):
         return 'Column<%s>' % self._jc.toString().encode('utf8')
 

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 51f12c5..22f6257 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -585,8 +585,6 @@ class SQLContext(object):
         Returns a :class:`DataFrameReader` that can be used to read data
         in as a :class:`DataFrame`.
 
-        .. note:: Experimental
-
         >>> sqlContext.read
         <pyspark.sql.readwriter.DataFrameReader object at ...>
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 132db90..55cad82 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -62,6 +62,8 @@ class DataFrame(object):
         people.filter(people.age > 30).join(department, people.deptId == department.id)) \
           .groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
 
+    .. note:: Experimental
+
     .. versionadded:: 1.3
     """
 

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 9b0d7f3..bbf465a 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -32,16 +32,21 @@ from pyspark.sql.column import Column, _to_java_column, _to_seq
 
 
 __all__ = [
+    'array',
     'approxCountDistinct',
     'coalesce',
     'countDistinct',
+    'explode',
     'monotonicallyIncreasingId',
     'rand',
     'randn',
     'sparkPartitionId',
+    'struct',
     'udf',
     'when']
 
+__all__ += ['lag', 'lead', 'ntile']
+
 
 def _create_function(name, doc=""):
     """ Create a function for aggregator by name"""
@@ -67,6 +72,17 @@ def _create_binary_mathfunction(name, doc=""):
     return _
 
 
+def _create_window_function(name, doc=''):
+    """ Create a window function by name """
+    def _():
+        sc = SparkContext._active_spark_context
+        jc = getattr(sc._jvm.functions, name)()
+        return Column(jc)
+    _.__name__ = name
+    _.__doc__ = 'Window function: ' + doc
+    return _
+
+
 _functions = {
     'lit': 'Creates a :class:`Column` of literal value.',
     'col': 'Returns a :class:`Column` based on the given column name.',
@@ -130,15 +146,53 @@ _binary_mathfunctions = {
     'pow': 'Returns the value of the first argument raised to the power of the second argument.'
 }
 
+_window_functions = {
+    'rowNumber':
+        """returns a sequential number starting at 1 within a window partition.
+
+        This is equivalent to the ROW_NUMBER function in SQL.""",
+    'denseRank':
+        """returns the rank of rows within a window partition, without any gaps.
+
+        The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+        sequence when there are ties. That is, if you were ranking a competition using denseRank
+        and had three people tie for second place, you would say that all three were in second
+        place and that the next person came in third.
+
+        This is equivalent to the DENSE_RANK function in SQL.""",
+    'rank':
+        """returns the rank of rows within a window partition.
+
+        The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+        sequence when there are ties. That is, if you were ranking a competition using denseRank
+        and had three people tie for second place, you would say that all three were in second
+        place and that the next person came in third.
+
+        This is equivalent to the RANK function in SQL.""",
+    'cumeDist':
+        """returns the cumulative distribution of values within a window partition,
+        i.e. the fraction of rows that are below the current row.
+
+        This is equivalent to the CUME_DIST function in SQL.""",
+    'percentRank':
+        """returns the relative rank (i.e. percentile) of rows within a window partition.
+
+        This is equivalent to the PERCENT_RANK function in SQL.""",
+}
+
 for _name, _doc in _functions.items():
     globals()[_name] = since(1.3)(_create_function(_name, _doc))
 for _name, _doc in _functions_1_4.items():
     globals()[_name] = since(1.4)(_create_function(_name, _doc))
 for _name, _doc in _binary_mathfunctions.items():
     globals()[_name] = since(1.4)(_create_binary_mathfunction(_name, _doc))
+for _name, _doc in _window_functions.items():
+    globals()[_name] = since(1.4)(_create_window_function(_name, _doc))
 del _name, _doc
 __all__ += _functions.keys()
+__all__ += _functions_1_4.keys()
 __all__ += _binary_mathfunctions.keys()
+__all__ += _window_functions.keys()
 __all__.sort()
 
 
@@ -177,27 +231,6 @@ def approxCountDistinct(col, rsd=None):
 
 
 @since(1.4)
-def explode(col):
-    """Returns a new row for each element in the given array or map.
-
-    >>> from pyspark.sql import Row
-    >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
-    >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
-    [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
-
-    >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
-    +---+-----+
-    |key|value|
-    +---+-----+
-    |  a|    b|
-    +---+-----+
-    """
-    sc = SparkContext._active_spark_context
-    jc = sc._jvm.functions.explode(_to_java_column(col))
-    return Column(jc)
-
-
-@since(1.4)
 def coalesce(*cols):
     """Returns the first column that is not null.
 
@@ -250,6 +283,27 @@ def countDistinct(col, *cols):
 
 
 @since(1.4)
+def explode(col):
+    """Returns a new row for each element in the given array or map.
+
+    >>> from pyspark.sql import Row
+    >>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
+    >>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
+    [Row(anInt=1), Row(anInt=2), Row(anInt=3)]
+
+    >>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+    +---+-----+
+    |key|value|
+    +---+-----+
+    |  a|    b|
+    +---+-----+
+    """
+    sc = SparkContext._active_spark_context
+    jc = sc._jvm.functions.explode(_to_java_column(col))
+    return Column(jc)
+
+
+@since(1.4)
 def monotonicallyIncreasingId():
     """A column that generates monotonically increasing 64-bit integers.
 
@@ -258,7 +312,7 @@ def monotonicallyIncreasingId():
     within each partition in the lower 33 bits. The assumption is that the data frame has
     less than 1 billion partitions, and each partition has less than 8 billion records.
 
-    As an example, consider a [[DataFrame]] with two partitions, each with 3 records.
+    As an example, consider a :class:`DataFrame` with two partitions, each with 3 records.
     This expression would return the following IDs:
     0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
 
@@ -349,6 +403,55 @@ def when(condition, value):
     return Column(jc)
 
 
+@since(1.4)
+def lag(col, count=1, default=None):
+    """
+    Window function: returns the value that is `offset` rows before the current row, and
+    `defaultValue` if there is less than `offset` rows before the current row. For example,
+    an `offset` of one will return the previous row at any given point in the window partition.
+
+    This is equivalent to the LAG function in SQL.
+
+    :param col: name of column or expression
+    :param count: number of row to extend
+    :param default: default value
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.lag(_to_java_column(col), count, default))
+
+
+@since(1.4)
+def lead(col, count=1, default=None):
+    """
+    Window function: returns the value that is `offset` rows after the current row, and
+    `defaultValue` if there is less than `offset` rows after the current row. For example,
+    an `offset` of one will return the next row at any given point in the window partition.
+
+    This is equivalent to the LEAD function in SQL.
+
+    :param col: name of column or expression
+    :param count: number of row to extend
+    :param default: default value
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.lead(_to_java_column(col), count, default))
+
+
+@since(1.4)
+def ntile(n):
+    """
+    Window function: returns a group id from 1 to `n` (inclusive) in a round-robin fashion in
+    a window partition. Fow example, if `n` is 3, the first row will get 1, the second row will
+    get 2, the third row will get 3, and the fourth row will get 1...
+
+    This is equivalent to the NTILE function in SQL.
+
+    :param n: an integer
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.ntile(int(n)))
+
+
 class UserDefinedFunction(object):
     """
     User defined function in Python

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/group.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 4da472a..5a37a67 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -49,6 +49,8 @@ class GroupedData(object):
     A set of methods for aggregations on a :class:`DataFrame`,
     created by :func:`DataFrame.groupBy`.
 
+    .. note:: Experimental
+
     .. versionadded:: 1.3
     """
 

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7e34996..5c53c3a 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -44,6 +44,7 @@ from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
 from pyspark.tests import ReusedPySparkTestCase
 from pyspark.sql.functions import UserDefinedFunction
+from pyspark.sql.window import Window
 
 
 class ExamplePointUDT(UserDefinedType):
@@ -743,11 +744,9 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
         try:
             cls.sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
         except py4j.protocol.Py4JError:
-            cls.sqlCtx = None
-            return
+            raise unittest.SkipTest("Hive is not available")
         except TypeError:
-            cls.sqlCtx = None
-            return
+            raise unittest.SkipTest("Hive is not available")
         os.unlink(cls.tempdir.name)
         _scala_HiveContext =\
             cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
@@ -761,9 +760,6 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
         shutil.rmtree(cls.tempdir.name, ignore_errors=True)
 
     def test_save_and_load_table(self):
-        if self.sqlCtx is None:
-            return  # no hive available, skipped
-
         df = self.df
         tmpPath = tempfile.mkdtemp()
         shutil.rmtree(tmpPath)
@@ -805,6 +801,27 @@ class HiveContextSQLTests(ReusedPySparkTestCase):
 
         shutil.rmtree(tmpPath)
 
+    def test_window_functions(self):
+        df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
+        w = Window.partitionBy("value").orderBy("key")
+        from pyspark.sql import functions as F
+        sel = df.select(df.value, df.key,
+                        F.max("key").over(w.rowsBetween(0, 1)),
+                        F.min("key").over(w.rowsBetween(0, 1)),
+                        F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
+                        F.rowNumber().over(w),
+                        F.rank().over(w),
+                        F.denseRank().over(w),
+                        F.ntile(2).over(w))
+        rs = sorted(sel.collect())
+        expected = [
+            ("1", 1, 1, 1, 1, 1, 1, 1, 1),
+            ("2", 1, 1, 1, 3, 1, 1, 1, 1),
+            ("2", 1, 2, 1, 3, 2, 1, 1, 1),
+            ("2", 2, 2, 2, 3, 3, 3, 2, 2)
+        ]
+        for r, ex in zip(rs, expected):
+            self.assertEqual(tuple(r), ex[:len(r)])
 
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/python/pyspark/sql/window.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
new file mode 100644
index 0000000..0a0e006
--- /dev/null
+++ b/python/pyspark/sql/window.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+from pyspark.sql import since
+from pyspark.sql.column import _to_seq, _to_java_column
+
+__all__ = ["Window", "WindowSpec"]
+
+
+def _to_java_cols(cols):
+    sc = SparkContext._active_spark_context
+    if len(cols) == 1 and isinstance(cols[0], list):
+        cols = cols[0]
+    return _to_seq(sc, cols, _to_java_column)
+
+
+class Window(object):
+
+    """
+    Utility functions for defining window in DataFrames.
+
+    For example:
+
+    >>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+    >>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)
+
+    >>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
+    >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
+
+    .. note:: Experimental
+
+    .. versionadded:: 1.4
+    """
+    @staticmethod
+    @since(1.4)
+    def partitionBy(*cols):
+        """
+        Creates a :class:`WindowSpec` with the partitioning defined.
+        """
+        sc = SparkContext._active_spark_context
+        jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
+        return WindowSpec(jspec)
+
+    @staticmethod
+    @since(1.4)
+    def orderBy(*cols):
+        """
+        Creates a :class:`WindowSpec` with the partitioning defined.
+        """
+        sc = SparkContext._active_spark_context
+        jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
+        return WindowSpec(jspec)
+
+
+class WindowSpec(object):
+    """
+    A window specification that defines the partitioning, ordering,
+    and frame boundaries.
+
+    Use the static methods in :class:`Window` to create a :class:`WindowSpec`.
+
+    .. note:: Experimental
+
+    .. versionadded:: 1.4
+    """
+
+    _JAVA_MAX_LONG = (1 << 63) - 1
+    _JAVA_MIN_LONG = - (1 << 63)
+
+    def __init__(self, jspec):
+        self._jspec = jspec
+
+    @since(1.4)
+    def partitionBy(self, *cols):
+        """
+        Defines the partitioning columns in a :class:`WindowSpec`.
+
+        :param cols: names of columns or expressions
+        """
+        return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols)))
+
+    @since(1.4)
+    def orderBy(self, *cols):
+        """
+        Defines the ordering columns in a :class:`WindowSpec`.
+
+        :param cols: names of columns or expressions
+        """
+        return WindowSpec(self._jspec.orderBy(_to_java_cols(cols)))
+
+    @since(1.4)
+    def rowsBetween(self, start, end):
+        """
+        Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+
+        Both `start` and `end` are relative positions from the current row.
+        For example, "0" means "current row", while "-1" means the row before
+        the current row, and "5" means the fifth row after the current row.
+
+        :param start: boundary start, inclusive.
+                      The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+        :param end: boundary end, inclusive.
+                    The frame is unbounded if this is ``sys.maxsize`` (or higher).
+        """
+        if start <= -sys.maxsize:
+            start = self._JAVA_MIN_LONG
+        if end >= sys.maxsize:
+            end = self._JAVA_MAX_LONG
+        return WindowSpec(self._jspec.rowsBetween(start, end))
+
+    @since(1.4)
+    def rangeBetween(self, start, end):
+        """
+        Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+
+        Both `start` and `end` are relative from the current row. For example,
+        "0" means "current row", while "-1" means one off before the current row,
+        and "5" means the five off after the current row.
+
+        :param start: boundary start, inclusive.
+                      The frame is unbounded if this is ``-sys.maxsize`` (or lower).
+        :param end: boundary end, inclusive.
+                    The frame is unbounded if this is ``sys.maxsize`` (or higher).
+        """
+        if start <= -sys.maxsize:
+            start = self._JAVA_MIN_LONG
+        if end >= sys.maxsize:
+            end = self._JAVA_MAX_LONG
+        return WindowSpec(self._jspec.rangeBetween(start, end))
+
+
+def _test():
+    import doctest
+    SparkContext('local[4]', 'PythonTest')
+    (failure_count, test_count) = doctest.testmod()
+    if failure_count:
+        exit(-1)
+
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 8775be7..9a23cfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -326,168 +326,135 @@ object functions {
   //////////////////////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Window function: returns the lag value of current row of the expression,
-   * null when the current row extends before the beginning of the window.
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `null` if there is less than `offset` rows before the current row. For example,
+   * an `offset` of one will return the previous row at any given point in the window partition.
    *
-   * @group window_funcs
-   * @since 1.4.0
-   */
-  def lag(columnName: String): Column = {
-    lag(columnName, 1)
-  }
-
-  /**
-   * Window function: returns the lag value of current row of the column,
-   * null when the current row extends before the beginning of the window.
+   * This is equivalent to the LAG function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lag(e: Column): Column = {
-    lag(e, 1)
+  def lag(e: Column, offset: Int): Column = {
+    lag(e, offset, null)
   }
 
   /**
-   * Window function: returns the lag values of current row of the expression,
-   * null when the current row extends before the beginning of the window.
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `null` if there is less than `offset` rows before the current row. For example,
+   * an `offset` of one will return the previous row at any given point in the window partition.
    *
-   * @group window_funcs
-   * @since 1.4.0
-   */
-  def lag(e: Column, count: Int): Column = {
-    lag(e, count, null)
-  }
-
-  /**
-   * Window function: returns the lag values of current row of the column,
-   * null when the current row extends before the beginning of the window.
+   * This is equivalent to the LAG function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lag(columnName: String, count: Int): Column = {
-    lag(columnName, count, null)
+  def lag(columnName: String, offset: Int): Column = {
+    lag(columnName, offset, null)
   }
 
   /**
-   * Window function: returns the lag values of current row of the column,
-   * given default value when the current row extends before the beginning
-   * of the window.
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `defaultValue` if there is less than `offset` rows before the current row. For example,
+   * an `offset` of one will return the previous row at any given point in the window partition.
    *
-   * @group window_funcs
-   * @since 1.4.0
-   */
-  def lag(columnName: String, count: Int, defaultValue: Any): Column = {
-    lag(Column(columnName), count, defaultValue)
-  }
-
-  /**
-   * Window function: returns the lag values of current row of the expression,
-   * given default value when the current row extends before the beginning
-   * of the window.
+   * This is equivalent to the LAG function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lag(e: Column, count: Int, defaultValue: Any): Column = {
-    UnresolvedWindowFunction("lag", e.expr :: Literal(count) :: Literal(defaultValue) :: Nil)
+  def lag(columnName: String, offset: Int, defaultValue: Any): Column = {
+    lag(Column(columnName), offset, defaultValue)
   }
 
   /**
-   * Window function: returns the lead value of current row of the column,
-   * null when the current row extends before the end of the window.
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `defaultValue` if there is less than `offset` rows before the current row. For example,
+   * an `offset` of one will return the previous row at any given point in the window partition.
    *
-   * @group window_funcs
-   * @since 1.4.0
-   */
-  def lead(columnName: String): Column = {
-    lead(columnName, 1)
-  }
-
-  /**
-   * Window function: returns the lead value of current row of the expression,
-   * null when the current row extends before the end of the window.
+   * This is equivalent to the LAG function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lead(e: Column): Column = {
-    lead(e, 1)
+  def lag(e: Column, offset: Int, defaultValue: Any): Column = {
+    UnresolvedWindowFunction("lag", e.expr :: Literal(offset) :: Literal(defaultValue) :: Nil)
   }
 
   /**
-   * Window function: returns the lead values of current row of the column,
-   * null when the current row extends before the end of the window.
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `null` if there is less than `offset` rows after the current row. For example,
+   * an `offset` of one will return the next row at any given point in the window partition.
    *
-   * @group window_funcs
-   * @since 1.4.0
-   */
-  def lead(columnName: String, count: Int): Column = {
-    lead(columnName, count, null)
-  }
-
-  /**
-   * Window function: returns the lead values of current row of the expression,
-   * null when the current row extends before the end of the window.
+   * This is equivalent to the LEAD function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lead(e: Column, count: Int): Column = {
-    lead(e, count, null)
+  def lead(columnName: String, offset: Int): Column = {
+    lead(columnName, offset, null)
   }
 
   /**
-   * Window function: returns the lead values of current row of the column,
-   * given default value when the current row extends before the end of the window.
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `null` if there is less than `offset` rows after the current row. For example,
+   * an `offset` of one will return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lead(columnName: String, count: Int, defaultValue: Any): Column = {
-    lead(Column(columnName), count, defaultValue)
+  def lead(e: Column, offset: Int): Column = {
+    lead(e, offset, null)
   }
 
   /**
-   * Window function: returns the lead values of current row of the expression,
-   * given default value when the current row extends before the end of the window.
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `defaultValue` if there is less than `offset` rows after the current row. For example,
+   * an `offset` of one will return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def lead(e: Column, count: Int, defaultValue: Any): Column = {
-    UnresolvedWindowFunction("lead", e.expr :: Literal(count) :: Literal(defaultValue) :: Nil)
+  def lead(columnName: String, offset: Int, defaultValue: Any): Column = {
+    lead(Column(columnName), offset, defaultValue)
   }
 
   /**
-   * NTILE for specified expression.
-   * NTILE allows easy calculation of tertiles, quartiles, deciles and other
-   * common summary statistics. This function divides an ordered partition into a specified
-   * number of groups called buckets and assigns a bucket number to each row in the partition.
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `defaultValue` if there is less than `offset` rows after the current row. For example,
+   * an `offset` of one will return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def ntile(e: Column): Column = {
-    UnresolvedWindowFunction("ntile", e.expr :: Nil)
+  def lead(e: Column, offset: Int, defaultValue: Any): Column = {
+    UnresolvedWindowFunction("lead", e.expr :: Literal(offset) :: Literal(defaultValue) :: Nil)
   }
 
   /**
-   * NTILE for specified column.
-   * NTILE allows easy calculation of tertiles, quartiles, deciles and other
-   * common summary statistics. This function divides an ordered partition into a specified
-   * number of groups called buckets and assigns a bucket number to each row in the partition.
+   * Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window
+   * partition. Fow example, if `n` is 4, the first quarter of the rows will get value 1, the second
+   * quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
+   *
+   * This is equivalent to the NTILE function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
    */
-  def ntile(columnName: String): Column = {
-    ntile(Column(columnName))
+  def ntile(n: Int): Column = {
+    UnresolvedWindowFunction("ntile", lit(n).expr :: Nil)
   }
 
   /**
-   * Assigns a unique number (sequentially, starting from 1, as defined by ORDER BY) to each
-   * row within the partition.
+   * Window function: returns a sequential number starting at 1 within a window partition.
+   *
+   * This is equivalent to the ROW_NUMBER function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
@@ -497,11 +464,15 @@ object functions {
   }
 
   /**
-   * The difference between RANK and DENSE_RANK is that DENSE_RANK leaves no gaps in ranking
-   * sequence when there are ties. That is, if you were ranking a competition using DENSE_RANK
+   * Window function: returns the rank of rows within a window partition, without any gaps.
+   *
+   * The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+   * sequence when there are ties. That is, if you were ranking a competition using denseRank
    * and had three people tie for second place, you would say that all three were in second
    * place and that the next person came in third.
    *
+   * This is equivalent to the DENSE_RANK function in SQL.
+   *
    * @group window_funcs
    * @since 1.4.0
    */
@@ -510,11 +481,15 @@ object functions {
   }
 
   /**
-   * The difference between RANK and DENSE_RANK is that DENSE_RANK leaves no gaps in ranking
-   * sequence when there are ties. That is, if you were ranking a competition using DENSE_RANK
+   * Window function: returns the rank of rows within a window partition.
+   *
+   * The difference between rank and denseRank is that denseRank leaves no gaps in ranking
+   * sequence when there are ties. That is, if you were ranking a competition using denseRank
    * and had three people tie for second place, you would say that all three were in second
    * place and that the next person came in third.
    *
+   * This is equivalent to the RANK function in SQL.
+   *
    * @group window_funcs
    * @since 1.4.0
    */
@@ -523,10 +498,16 @@ object functions {
   }
 
   /**
-   * CUME_DIST (defined as the inverse of percentile in some statistical books) computes
-   * the position of a specified value relative to a set of values.
-   * To compute the CUME_DIST of a value x in a set S of size N, you use the formula:
-   * CUME_DIST(x) = number of values in S coming before and including x in the specified order / N
+   * Window function: returns the cumulative distribution of values within a window partition,
+   * i.e. the fraction of rows that are below the current row.
+   *
+   * {{{
+   *   N = total number of rows in the partition
+   *   cumeDist(x) = number of values before (and including) x / N
+   * }}}
+   *
+   *
+   * This is equivalent to the CUME_DIST function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0
@@ -536,10 +517,14 @@ object functions {
   }
 
   /**
-   * PERCENT_RANK is similar to CUME_DIST, but it uses rank values rather than row counts
-   * in its numerator.
-   * The formula:
-   * (rank of row in its partition - 1) / (number of rows in the partition - 1)
+   * Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
+   *
+   * This is computed by:
+   * {{{
+   *   (rank of row in its partition - 1) / (number of rows in the partition - 1)
+   * }}}
+   *
+   * This is equivalent to the PERCENT_RANK function in SQL.
    *
    * @group window_funcs
    * @since 1.4.0

http://git-wip-us.apache.org/repos/asf/spark/blob/efe3bfdf/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
index 6cea677..efb3f25 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
@@ -31,8 +31,8 @@ class HiveDataFrameWindowSuite extends QueryTest {
 
     checkAnswer(
       df.select(
-        lead("key").over(w),
-        lead("value").over(w)),
+        lead("key", 1).over(w),
+        lead("value", 1).over(w)),
       Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
   }
 
@@ -42,8 +42,8 @@ class HiveDataFrameWindowSuite extends QueryTest {
 
     checkAnswer(
       df.select(
-        lead("key").over(w),
-        lead("value").over(w)),
+        lead("key", 1).over(w),
+        lead("value", 1).over(w)),
       Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
   }
 
@@ -53,7 +53,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
 
     checkAnswer(
       df.select(
-        lead("value").over(Window.partitionBy($"key").orderBy($"value"))),
+        lead("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
       sql(
         """SELECT
           | lead(value) OVER (PARTITION BY key ORDER BY value)
@@ -66,9 +66,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
 
     checkAnswer(
       df.select(
-        lag("value").over(
-          Window.partitionBy($"key")
-          .orderBy($"value"))),
+        lag("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
       sql(
         """SELECT
           | lag(value) OVER (PARTITION BY key ORDER BY value)
@@ -112,8 +110,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
         mean("key").over(Window.partitionBy("value").orderBy("key")),
         count("key").over(Window.partitionBy("value").orderBy("key")),
         sum("key").over(Window.partitionBy("value").orderBy("key")),
-        ntile("key").over(Window.partitionBy("value").orderBy("key")),
-        ntile($"key").over(Window.partitionBy("value").orderBy("key")),
+        ntile(2).over(Window.partitionBy("value").orderBy("key")),
         rowNumber().over(Window.partitionBy("value").orderBy("key")),
         denseRank().over(Window.partitionBy("value").orderBy("key")),
         rank().over(Window.partitionBy("value").orderBy("key")),
@@ -127,8 +124,7 @@ class HiveDataFrameWindowSuite extends QueryTest {
            |avg(key) over (partition by value order by key),
            |count(key) over (partition by value order by key),
            |sum(key) over (partition by value order by key),
-           |ntile(key) over (partition by value order by key),
-           |ntile(key) over (partition by value order by key),
+           |ntile(2) over (partition by value order by key),
            |row_number() over (partition by value order by key),
            |dense_rank() over (partition by value order by key),
            |rank() over (partition by value order by key),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org