You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/11 09:55:44 UTC
spark git commit: [SPARK-23084][PYTHON] Add unboundedPreceding(),
unboundedFollowing() and currentRow() to PySpark
Repository: spark
Updated Branches:
refs/heads/master a34fce19b -> 8acb51f08
[SPARK-23084][PYTHON] Add unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark
## What changes were proposed in this pull request?
Added unboundedPreceding(), unboundedFollowing() and currentRow() to PySpark, also updated the rangeBetween API
## How was this patch tested?
did unit test on my local. Please let me know if I need to add unit test in tests.py
Author: Huaxin Gao <hu...@us.ibm.com>
Closes #20400 from huaxingao/spark_23084.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8acb51f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8acb51f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8acb51f0
Branch: refs/heads/master
Commit: 8acb51f08b448628b65e90af3b268994f9550e45
Parents: a34fce1
Author: Huaxin Gao <hu...@us.ibm.com>
Authored: Sun Feb 11 18:55:38 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Sun Feb 11 18:55:38 2018 +0900
----------------------------------------------------------------------
python/pyspark/sql/functions.py | 30 ++++++++++++++++
python/pyspark/sql/window.py | 70 ++++++++++++++++++++++++++----------
2 files changed, 82 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8acb51f0/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 05031f5..9bb9c32 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -809,6 +809,36 @@ def ntile(n):
return Column(sc._jvm.functions.ntile(int(n)))
+@since(2.4)
+def unboundedPreceding():
+ """
+ Window function: returns the special frame boundary that represents the first row
+ in the window partition.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.unboundedPreceding())
+
+
+@since(2.4)
+def unboundedFollowing():
+ """
+ Window function: returns the special frame boundary that represents the last row
+ in the window partition.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.unboundedFollowing())
+
+
+@since(2.4)
+def currentRow():
+ """
+ Window function: returns the special frame boundary that represents the current row
+ in the window partition.
+ """
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.functions.currentRow())
+
+
# ---------------------- Date/Timestamp functions ------------------------------
@since(1.5)
http://git-wip-us.apache.org/repos/asf/spark/blob/8acb51f0/python/pyspark/sql/window.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py
index 7ce27f9..bb841a9 100644
--- a/python/pyspark/sql/window.py
+++ b/python/pyspark/sql/window.py
@@ -16,9 +16,11 @@
#
import sys
+if sys.version >= '3':
+ long = int
from pyspark import since, SparkContext
-from pyspark.sql.column import _to_seq, _to_java_column
+from pyspark.sql.column import Column, _to_seq, _to_java_column
__all__ = ["Window", "WindowSpec"]
@@ -120,20 +122,45 @@ class Window(object):
and "5" means the five off after the current row.
We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
- and ``Window.currentRow`` to specify special boundary values, rather than using integral
- values directly.
+ ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``,
+ ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow``
+ to specify special boundary values, rather than using integral values directly.
:param start: boundary start, inclusive.
- The frame is unbounded if this is ``Window.unboundedPreceding``, or
+ The frame is unbounded if this is ``Window.unboundedPreceding``,
+ a column returned by ``pyspark.sql.functions.unboundedPreceding``, or
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
- The frame is unbounded if this is ``Window.unboundedFollowing``, or
+ The frame is unbounded if this is ``Window.unboundedFollowing``,
+ a column returned by ``pyspark.sql.functions.unboundedFollowing``, or
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
+
+ >>> from pyspark.sql import functions as F, SparkSession, Window
+ >>> spark = SparkSession.builder.getOrCreate()
+ >>> df = spark.createDataFrame(
+ ... [(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"])
+ >>> window = Window.orderBy("id").partitionBy("category").rangeBetween(
+ ... F.currentRow(), F.lit(1))
+ >>> df.withColumn("sum", F.sum("id").over(window)).show()
+ +---+--------+---+
+ | id|category|sum|
+ +---+--------+---+
+ | 1| b| 3|
+ | 2| b| 5|
+ | 3| b| 3|
+ | 1| a| 4|
+ | 1| a| 4|
+ | 2| a| 2|
+ +---+--------+---+
"""
- if start <= Window._PRECEDING_THRESHOLD:
- start = Window.unboundedPreceding
- if end >= Window._FOLLOWING_THRESHOLD:
- end = Window.unboundedFollowing
+ if isinstance(start, (int, long)) and isinstance(end, (int, long)):
+ if start <= Window._PRECEDING_THRESHOLD:
+ start = Window.unboundedPreceding
+ if end >= Window._FOLLOWING_THRESHOLD:
+ end = Window.unboundedFollowing
+ elif isinstance(start, Column) and isinstance(end, Column):
+ start = start._jc
+ end = end._jc
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.rangeBetween(start, end)
return WindowSpec(jspec)
@@ -208,27 +235,34 @@ class WindowSpec(object):
and "5" means the five off after the current row.
We recommend users use ``Window.unboundedPreceding``, ``Window.unboundedFollowing``,
- and ``Window.currentRow`` to specify special boundary values, rather than using integral
- values directly.
+ ``Window.currentRow``, ``pyspark.sql.functions.unboundedPreceding``,
+ ``pyspark.sql.functions.unboundedFollowing`` and ``pyspark.sql.functions.currentRow``
+ to specify special boundary values, rather than using integral values directly.
:param start: boundary start, inclusive.
- The frame is unbounded if this is ``Window.unboundedPreceding``, or
+ The frame is unbounded if this is ``Window.unboundedPreceding``,
+ a column returned by ``pyspark.sql.functions.unboundedPreceding``, or
any value less than or equal to max(-sys.maxsize, -9223372036854775808).
:param end: boundary end, inclusive.
- The frame is unbounded if this is ``Window.unboundedFollowing``, or
+ The frame is unbounded if this is ``Window.unboundedFollowing``,
+ a column returned by ``pyspark.sql.functions.unboundedFollowing``, or
any value greater than or equal to min(sys.maxsize, 9223372036854775807).
"""
- if start <= Window._PRECEDING_THRESHOLD:
- start = Window.unboundedPreceding
- if end >= Window._FOLLOWING_THRESHOLD:
- end = Window.unboundedFollowing
+ if isinstance(start, (int, long)) and isinstance(end, (int, long)):
+ if start <= Window._PRECEDING_THRESHOLD:
+ start = Window.unboundedPreceding
+ if end >= Window._FOLLOWING_THRESHOLD:
+ end = Window.unboundedFollowing
+ elif isinstance(start, Column) and isinstance(end, Column):
+ start = start._jc
+ end = end._jc
return WindowSpec(self._jspec.rangeBetween(start, end))
def _test():
import doctest
SparkContext('local[4]', 'PythonTest')
- (failure_count, test_count) = doctest.testmod()
+ (failure_count, test_count) = doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)
if failure_count:
exit(-1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org