You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/09/29 04:55:00 UTC

[jira] [Resolved] (SPARK-33021) Move functions related test cases into test_functions.py

     [ https://issues.apache.org/jira/browse/SPARK-33021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun resolved SPARK-33021.
-----------------------------------
    Fix Version/s: 3.1.0
       Resolution: Fixed

Issue resolved by pull request 29898
[https://github.com/apache/spark/pull/29898]

> Move functions related test cases into test_functions.py
> --------------------------------------------------------
>
>                 Key: SPARK-33021
>                 URL: https://issues.apache.org/jira/browse/SPARK-33021
>             Project: Spark
>          Issue Type: Test
>          Components: PySpark
>    Affects Versions: 3.1.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Minor
>             Fix For: 3.1.0
>
>
> Function related test cases should be located in {{test_functions.py}}. However, some tests below are located in {{test_context.py}}.
> {code}
>     def test_window_functions(self):
>         df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>         w = Window.partitionBy("value").orderBy("key")
>         from pyspark.sql import functions as F
>         sel = df.select(df.value, df.key,
>                         F.max("key").over(w.rowsBetween(0, 1)),
>                         F.min("key").over(w.rowsBetween(0, 1)),
>                         F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
>                         F.row_number().over(w),
>                         F.rank().over(w),
>                         F.dense_rank().over(w),
>                         F.ntile(2).over(w),
>                         F.nth_value("key", 2))
>         rs = sorted(sel.collect())
>         expected = [
>             ("1", 1, 1, 1, 1, 1, 1, 1, 1),
>             ("2", 1, 1, 1, 3, 1, 1, 1, 1),
>             ("2", 1, 2, 1, 3, 2, 1, 1, 1),
>             ("2", 2, 2, 2, 3, 3, 3, 2, 2)
>         ]
>         for r, ex in zip(rs, expected):
>             self.assertEqual(tuple(r), ex[:len(r)])
>     def test_window_functions_without_partitionBy(self):
>         df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>         w = Window.orderBy("key", df.value)
>         from pyspark.sql import functions as F
>         sel = df.select(df.value, df.key,
>                         F.max("key").over(w.rowsBetween(0, 1)),
>                         F.min("key").over(w.rowsBetween(0, 1)),
>                         F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
>                         F.row_number().over(w),
>                         F.rank().over(w),
>                         F.dense_rank().over(w),
>                         F.ntile(2).over(w))
>         rs = sorted(sel.collect())
>         expected = [
>             ("1", 1, 1, 1, 4, 1, 1, 1, 1),
>             ("2", 1, 1, 1, 4, 2, 2, 2, 1),
>             ("2", 1, 2, 1, 4, 3, 2, 2, 2),
>             ("2", 2, 2, 2, 4, 4, 4, 3, 2)
>         ]
>         for r, ex in zip(rs, expected):
>             self.assertEqual(tuple(r), ex[:len(r)])
>     def test_window_functions_cumulative_sum(self):
>         df = self.spark.createDataFrame([("one", 1), ("two", 2)], ["key", "value"])
>         from pyspark.sql import functions as F
>         # Test cumulative sum
>         sel = df.select(
>             df.key,
>             F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding, 0)))
>         rs = sorted(sel.collect())
>         expected = [("one", 1), ("two", 3)]
>         for r, ex in zip(rs, expected):
>             self.assertEqual(tuple(r), ex[:len(r)])
>         # Test boundary values less than JVM's Long.MinValue and make sure we don't overflow
>         sel = df.select(
>             df.key,
>             F.sum(df.value).over(Window.rowsBetween(Window.unboundedPreceding - 1, 0)))
>         rs = sorted(sel.collect())
>         expected = [("one", 1), ("two", 3)]
>         for r, ex in zip(rs, expected):
>             self.assertEqual(tuple(r), ex[:len(r)])
>         # Test boundary values greater than JVM's Long.MaxValue and make sure we don't overflow
>         frame_end = Window.unboundedFollowing + 1
>         sel = df.select(
>             df.key,
>             F.sum(df.value).over(Window.rowsBetween(Window.currentRow, frame_end)))
>         rs = sorted(sel.collect())
>         expected = [("one", 3), ("two", 2)]
>         for r, ex in zip(rs, expected):
>             self.assertEqual(tuple(r), ex[:len(r)])
>     def test_collect_functions(self):
>         df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>         from pyspark.sql import functions
>         self.assertEqual(
>             sorted(df.select(functions.collect_set(df.key).alias('r')).collect()[0].r),
>             [1, 2])
>         self.assertEqual(
>             sorted(df.select(functions.collect_list(df.key).alias('r')).collect()[0].r),
>             [1, 1, 1, 2])
>         self.assertEqual(
>             sorted(df.select(functions.collect_set(df.value).alias('r')).collect()[0].r),
>             ["1", "2"])
>         self.assertEqual(
>             sorted(df.select(functions.collect_list(df.value).alias('r')).collect()[0].r),
>             ["1", "2", "2", "2"])
>     def test_datetime_functions(self):
>         from pyspark.sql import functions
>         from datetime import date
>         df = self.spark.range(1).selectExpr("'2017-01-22' as dateCol")
>         parse_result = df.select(functions.to_date(functions.col("dateCol"))).first()
>         self.assertEquals(date(2017, 1, 22), parse_result['to_date(dateCol)'])
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org