You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dongjoon-hyun (via GitHub)" <gi...@apache.org> on 2023/11/18 23:55:21 UTC

Re: [PR] [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests [spark]

dongjoon-hyun commented on code in PR #41330:
URL: https://github.com/apache/spark/pull/41330#discussion_r1398290749


##########
python/pyspark/pandas/tests/computation/test_apply_func.py:
##########
@@ -0,0 +1,575 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from datetime import datetime
+from distutils.version import LooseVersion
+import sys
+import unittest
+from typing import List
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.testing.pandasutils import ComparisonTestBase
+from pyspark.testing.sqlutils import SQLTestUtils
+
+
+# This file contains test cases for 'Function application, GroupBy & Window'
+# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#function-application-groupby-window
+# as well as 'apply_batch*' and 'transform_batch*'.
+class FrameApplyFunctionMixin:
+    @property
+    def pdf(self):
+        return pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
+            index=np.random.rand(9),
+        )
+
+    @property
+    def df_pair(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+        return pdf, psdf
+
+    def test_apply(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+        )
+
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+                pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+                pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+                pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.apply(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*1 or 'column'; however"):
+
+            def f1(_) -> ps.DataFrame[int]:
+                pass
+
+            psdf.apply(f1, axis=0)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*0 or 'index'; however"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.apply(f2, axis=1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+
+    def test_apply_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.apply(identify1, axis=1)
+        expected = pdf.apply(identify1, axis=1)
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.apply(identify2, axis=1)
+        expected = pdf.apply(identify2, axis=1)
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+    def test_apply_batch(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
+            (pdf + 1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.pandas_on_spark.apply_batch(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.pandas_on_spark.apply_batch(f2)
+
+        with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
+            psdf.pandas_on_spark.apply_batch(lambda pdf: 1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+    def test_apply_batch_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.pandas_on_spark.apply_batch(identify1)
+        expected = pdf
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.pandas_on_spark.apply_batch(identify2)
+        expected = pdf
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        pdf = pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]]},
+            index=np.random.rand(9),
+        )
+        psdf = ps.from_pandas(pdf)
+
+        def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:

Review Comment:
   Unfortunately, this seems to cause test case failures in Python 3.11 environment.
   
   ```
   ======================================================================
   ERROR [0.686s]: test_apply_batch_with_type (pyspark.pandas.tests.computation.test_apply_func.FrameApplyFunctionTests.test_apply_batch_with_type)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/__w/spark/spark/python/pyspark/pandas/tests/computation/test_apply_func.py", line 248, in test_apply_batch_with_type
       def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:
                           ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13540, in __class_getitem__
       return create_tuple_for_frame_type(params)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 721, in create_tuple_for_frame_type
       return Tuple[_to_type_holders(params)]
                    ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 766, in _to_type_holders
       data_types = _new_type_holders(data_types, NameTypeHolder)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 832, in _new_type_holders
       raise TypeError(
   TypeError: Type hints should be specified as one of:
     - DataFrame[type, type, ...]
     - DataFrame[name: type, name: type, ...]
     - DataFrame[dtypes instance]
     - DataFrame[zip(names, types)]
     - DataFrame[index_type, [type, ...]]
     - DataFrame[(index_name, index_type), [(name, type), ...]]
     - DataFrame[dtype instance, dtypes instance]
     - DataFrame[(index_name, index_type), zip(names, types)]
     - DataFrame[[index_type, ...], [type, ...]]
     - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
     - DataFrame[dtypes instance, dtypes instance]
     - DataFrame[zip(index_names, index_types), zip(names, types)]
   However, got (<class 'int'>, typing.List[int]).
   
   ----------------------------------------------------------------------
   Ran 10 tests in 34.327s
   
   FAILED (errors=1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org