You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/11 09:55:44 UTC

spark git commit: [SPARK-23084][PYTHON] Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark

Repository: spark
Updated Branches:
  refs/heads/master a34fce19b -> 8acb51f08


[SPARK-23084][PYTHON] Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark

## What changes were proposed in this pull request?

Added unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark, also updated the rangeBetween API

## How was this patch tested?

did unit test on my local. Please let me know if I need to add unit test in tests.py

Author: Huaxin Gao <hu...@us.ibm.com>

Closes #20400 from huaxingao/spark_23084.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8acb51f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8acb51f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8acb51f0

Branch: refs/heads/master
Commit: 8acb51f08b448628b65e90af3b268994f9550e45
Parents: a34fce1
Author: Huaxin Gao <hu...@us.ibm.com>
Authored: Sun Feb 11 18:55:38 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Feb 11 18:55:38 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/functions.py | 30 ++++++++++++++++
 python/pyspark/sql/window.py    | 70 ++++++++++++++++++++++++++----------
 2 files changed, 82 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8acb51f0/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 05031f5..9bb9c32 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -809,6 +809,36 @@ def ntile(n):
     return Column(sc._jvm.functions.ntile(int(n)))
 
 
+@since(2.4)
+def unboundedPreceding():
+    """
+    Window function: returns the special frame boundary that represents the first row
+    in the window partition.
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.unboundedPreceding())
+
+
+@since(2.4)
+def unboundedFollowing():
+    """
+    Window function: returns the special frame boundary that represents the last row
+    in the window partition.
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.unboundedFollowing())
+
+
+@since(2.4)
+def currentRow():
+    """
+    Window function: returns the special frame boundary that represents the current row
+    in the window partition.
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.currentRow())
+
+
 # ---------------------- Date/Timestamp functions ------------------------------
 
 @since(1.5)

http://git-wip-us.apache.org/repos/asf/spark/blob/8acb51f0/python/pyspark/sql/window.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 7ce27f9..bb841a9 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -16,9 +16,11 @@
 #
 
 import sys
+if sys.version >= '3':
+    long = int
 
 from pyspark import since, SparkContext
-from pyspark.sql.column import _to_seq, _to_java_column
+from pyspark.sql.column import Column, _to_seq, _to_java_column
 
 __all__ = ["Window", "WindowSpec"]
 
@@ -120,20 +122,45 @@ class Window(object):
         and "5" means the five off after the current row.
 
         We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
-        and ``Window.currentRow`` to specify special boundary values, rather than using integral
-        values directly.
+        ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``,
+        ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow``
+        to specify special boundary values, rather than using integral values directly.
 
         :param start: boundary start, inclusive.
-                      The frame is unbounded if this is ``Window.unboundedPreceding``, or
+                      The frame is unbounded if this is ``Window.unboundedPreceding``,
+                      a column returned by ``pyspark.sql.functions.unboundedPreceding``, or
                       any value less than or equal to max(-sys.maxsize, -9223372036854775808).
         :param end: boundary end, inclusive.
-                    The frame is unbounded if this is ``Window.unboundedFollowing``, or
+                    The frame is unbounded if this is ``Window.unboundedFollowing``,
+                    a column returned by ``pyspark.sql.functions.unboundedFollowing``, or
                     any value greater than or equal to min(sys.maxsize, 9223372036854775807).
+
+        >>> from pyspark.sql import functions as F, SparkSession, Window
+        >>> spark = SparkSession.builder.getOrCreate()
+        >>> df = spark.createDataFrame(
+        ...     [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])
+        >>> window = Window.orderBy("id").partitionBy("category").rangeBetween(
+        ...     F.currentRow(), F.lit(1))
+        >>> df.withColumn("sum", F.sum("id").over(window)).show()
+        +---+--------+---+
+        | id|category|sum|
+        +---+--------+---+
+        |  1|       b|  3|
+        |  2|       b|  5|
+        |  3|       b|  3|
+        |  1|       a|  4|
+        |  1|       a|  4|
+        |  2|       a|  2|
+        +---+--------+---+
         """
-        if start <= Window._PRECEDING_THRESHOLD:
-            start = Window.unboundedPreceding
-        if end >= Window._FOLLOWING_THRESHOLD:
-            end = Window.unboundedFollowing
+        if isinstance(start, (int, long)) and isinstance(end, (int, long)):
+            if start <= Window._PRECEDING_THRESHOLD:
+                start = Window.unboundedPreceding
+            if end >= Window._FOLLOWING_THRESHOLD:
+                end = Window.unboundedFollowing
+        elif isinstance(start, Column) and isinstance(end, Column):
+            start = start._jc
+            end = end._jc
         sc = SparkContext._active_spark_context
         jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
         return WindowSpec(jspec)
@@ -208,27 +235,34 @@ class WindowSpec(object):
         and "5" means the five off after the current row.
 
         We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
-        and ``Window.currentRow`` to specify special boundary values, rather than using integral
-        values directly.
+        ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``,
+        ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow``
+        to specify special boundary values, rather than using integral values directly.
 
         :param start: boundary start, inclusive.
-                      The frame is unbounded if this is ``Window.unboundedPreceding``, or
+                      The frame is unbounded if this is ``Window.unboundedPreceding``,
+                      a column returned by ``pyspark.sql.functions.unboundedPreceding``, or
                       any value less than or equal to max(-sys.maxsize, -9223372036854775808).
         :param end: boundary end, inclusive.
-                    The frame is unbounded if this is ``Window.unboundedFollowing``, or
+                    The frame is unbounded if this is ``Window.unboundedFollowing``,
+                    a column returned by ``pyspark.sql.functions.unboundedFollowing``, or
                     any value greater than or equal to min(sys.maxsize, 9223372036854775807).
         """
-        if start <= Window._PRECEDING_THRESHOLD:
-            start = Window.unboundedPreceding
-        if end >= Window._FOLLOWING_THRESHOLD:
-            end = Window.unboundedFollowing
+        if isinstance(start, (int, long)) and isinstance(end, (int, long)):
+            if start <= Window._PRECEDING_THRESHOLD:
+                start = Window.unboundedPreceding
+            if end >= Window._FOLLOWING_THRESHOLD:
+                end = Window.unboundedFollowing
+        elif isinstance(start, Column) and isinstance(end, Column):
+            start = start._jc
+            end = end._jc
         return WindowSpec(self._jspec.rangeBetween(start, end))
 
 
 def _test():
     import doctest
     SparkContext('local[4]', 'PythonTest')
-    (failure_count, test_count) = doctest.testmod()
+    (failure_count, test_count) = doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)
     if failure_count:
         exit(-1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org