You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/05/26 10:20:49 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #41330: [WIP][SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests

zhengruifeng opened a new pull request, #41330:
URL: https://github.com/apache/spark/pull/41330

   ### What changes were proposed in this pull request?
   reorganize `ps.DataFrame` unit tests, to follow the categories in
   https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html
   and https://pandas.pydata.org/docs/reference/frame.html
   
   
   ### Why are the changes needed?
   1, even after https://github.com/apache/spark/commit/58b6535cbf76f2b26fc08b94905a57dcc4d955f6, `pyspark.pandas.tests.test_dataframe` and `pyspark.pandas.tests.test_dataframe_slow` are still too slow, each one normally takes >5 minutes, and sometimes >10 minutes.
   
   2, the tests were not well organized, and [tests in Pandas](https://github.com/pandas-dev/pandas/tree/main/pandas/tests) are grouped by categories, we should follow it to make PS tests more maintainable
   
   ### Does this PR introduce _any_ user-facing change?
   no test-only
   
   ### How was this patch tested?
   updated CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #41330: [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #41330:
URL: https://github.com/apache/spark/pull/41330#issuecomment-1565047089

   cc @HyukjinKwon @itholic 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #41330: [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #41330:
URL: https://github.com/apache/spark/pull/41330#issuecomment-1565763264

   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng closed pull request #41330: [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng closed pull request #41330: [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests
URL: https://github.com/apache/spark/pull/41330


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41330:
URL: https://github.com/apache/spark/pull/41330#discussion_r1398290749


##########
python/pyspark/pandas/tests/computation/test_apply_func.py:
##########
@@ -0,0 +1,575 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from datetime import datetime
+from distutils.version import LooseVersion
+import sys
+import unittest
+from typing import List
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.testing.pandasutils import ComparisonTestBase
+from pyspark.testing.sqlutils import SQLTestUtils
+
+
+# This file contains test cases for 'Function application, GroupBy & Window'
+# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#function-application-groupby-window
+# as well as 'apply_batch*' and 'transform_batch*'.
+class FrameApplyFunctionMixin:
+    @property
+    def pdf(self):
+        return pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
+            index=np.random.rand(9),
+        )
+
+    @property
+    def df_pair(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+        return pdf, psdf
+
+    def test_apply(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+        )
+
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+                pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+                pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+                pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.apply(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*1 or 'column'; however"):
+
+            def f1(_) -> ps.DataFrame[int]:
+                pass
+
+            psdf.apply(f1, axis=0)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*0 or 'index'; however"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.apply(f2, axis=1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+
+    def test_apply_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.apply(identify1, axis=1)
+        expected = pdf.apply(identify1, axis=1)
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.apply(identify2, axis=1)
+        expected = pdf.apply(identify2, axis=1)
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+    def test_apply_batch(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
+            (pdf + 1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.pandas_on_spark.apply_batch(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.pandas_on_spark.apply_batch(f2)
+
+        with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
+            psdf.pandas_on_spark.apply_batch(lambda pdf: 1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+    def test_apply_batch_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.pandas_on_spark.apply_batch(identify1)
+        expected = pdf
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.pandas_on_spark.apply_batch(identify2)
+        expected = pdf
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        pdf = pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]]},
+            index=np.random.rand(9),
+        )
+        psdf = ps.from_pandas(pdf)
+
+        def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:

Review Comment:
   Unfortunately, this seems to cause test case failures in Python 3.11 environment. This test case is originated at SPARK-36708 at Apache Spark 3.3.0.
   
   - https://github.com/apache/spark/actions/workflows/build_python.yml
   
   ```
   ======================================================================
   ERROR [0.686s]: test_apply_batch_with_type (pyspark.pandas.tests.computation.test_apply_func.FrameApplyFunctionTests.test_apply_batch_with_type)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/__w/spark/spark/python/pyspark/pandas/tests/computation/test_apply_func.py", line 248, in test_apply_batch_with_type
       def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:
                           ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13540, in __class_getitem__
       return create_tuple_for_frame_type(params)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 721, in create_tuple_for_frame_type
       return Tuple[_to_type_holders(params)]
                    ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 766, in _to_type_holders
       data_types = _new_type_holders(data_types, NameTypeHolder)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 832, in _new_type_holders
       raise TypeError(
   TypeError: Type hints should be specified as one of:
     - DataFrame[type, type, ...]
     - DataFrame[name: type, name: type, ...]
     - DataFrame[dtypes instance]
     - DataFrame[zip(names, types)]
     - DataFrame[index_type, [type, ...]]
     - DataFrame[(index_name, index_type), [(name, type), ...]]
     - DataFrame[dtype instance, dtypes instance]
     - DataFrame[(index_name, index_type), zip(names, types)]
     - DataFrame[[index_type, ...], [type, ...]]
     - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
     - DataFrame[dtypes instance, dtypes instance]
     - DataFrame[zip(index_names, index_types), zip(names, types)]
   However, got (<class 'int'>, typing.List[int]).
   
   ----------------------------------------------------------------------
   Ran 10 tests in 34.327s
   
   FAILED (errors=1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41330:
URL: https://github.com/apache/spark/pull/41330#discussion_r1398290749


##########
python/pyspark/pandas/tests/computation/test_apply_func.py:
##########
@@ -0,0 +1,575 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from datetime import datetime
+from distutils.version import LooseVersion
+import sys
+import unittest
+from typing import List
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.testing.pandasutils import ComparisonTestBase
+from pyspark.testing.sqlutils import SQLTestUtils
+
+
+# This file contains test cases for 'Function application, GroupBy & Window'
+# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#function-application-groupby-window
+# as well as 'apply_batch*' and 'transform_batch*'.
+class FrameApplyFunctionMixin:
+    @property
+    def pdf(self):
+        return pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
+            index=np.random.rand(9),
+        )
+
+    @property
+    def df_pair(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+        return pdf, psdf
+
+    def test_apply(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+        )
+
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+                pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+                pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+                pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.apply(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*1 or 'column'; however"):
+
+            def f1(_) -> ps.DataFrame[int]:
+                pass
+
+            psdf.apply(f1, axis=0)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*0 or 'index'; however"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.apply(f2, axis=1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+
+    def test_apply_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.apply(identify1, axis=1)
+        expected = pdf.apply(identify1, axis=1)
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.apply(identify2, axis=1)
+        expected = pdf.apply(identify2, axis=1)
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+    def test_apply_batch(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
+            (pdf + 1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.pandas_on_spark.apply_batch(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.pandas_on_spark.apply_batch(f2)
+
+        with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
+            psdf.pandas_on_spark.apply_batch(lambda pdf: 1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+    def test_apply_batch_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.pandas_on_spark.apply_batch(identify1)
+        expected = pdf
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.pandas_on_spark.apply_batch(identify2)
+        expected = pdf
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        pdf = pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]]},
+            index=np.random.rand(9),
+        )
+        psdf = ps.from_pandas(pdf)
+
+        def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:

Review Comment:
   Unfortunately, this seems to cause test case failures in Python 3.11 environment.
   
   - https://github.com/apache/spark/actions/workflows/build_python.yml
   
   ```
   ======================================================================
   ERROR [0.686s]: test_apply_batch_with_type (pyspark.pandas.tests.computation.test_apply_func.FrameApplyFunctionTests.test_apply_batch_with_type)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/__w/spark/spark/python/pyspark/pandas/tests/computation/test_apply_func.py", line 248, in test_apply_batch_with_type
       def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:
                           ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13540, in __class_getitem__
       return create_tuple_for_frame_type(params)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 721, in create_tuple_for_frame_type
       return Tuple[_to_type_holders(params)]
                    ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 766, in _to_type_holders
       data_types = _new_type_holders(data_types, NameTypeHolder)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 832, in _new_type_holders
       raise TypeError(
   TypeError: Type hints should be specified as one of:
     - DataFrame[type, type, ...]
     - DataFrame[name: type, name: type, ...]
     - DataFrame[dtypes instance]
     - DataFrame[zip(names, types)]
     - DataFrame[index_type, [type, ...]]
     - DataFrame[(index_name, index_type), [(name, type), ...]]
     - DataFrame[dtype instance, dtypes instance]
     - DataFrame[(index_name, index_type), zip(names, types)]
     - DataFrame[[index_type, ...], [type, ...]]
     - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
     - DataFrame[dtypes instance, dtypes instance]
     - DataFrame[zip(index_names, index_types), zip(names, types)]
   However, got (<class 'int'>, typing.List[int]).
   
   ----------------------------------------------------------------------
   Ran 10 tests in 34.327s
   
   FAILED (errors=1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-43603][PS][CONNECT][TEST] Reorganize ps.DataFrame unit tests [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41330:
URL: https://github.com/apache/spark/pull/41330#discussion_r1398290749


##########
python/pyspark/pandas/tests/computation/test_apply_func.py:
##########
@@ -0,0 +1,575 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from datetime import datetime
+from distutils.version import LooseVersion
+import sys
+import unittest
+from typing import List
+
+import numpy as np
+import pandas as pd
+
+from pyspark import pandas as ps
+from pyspark.pandas.config import option_context
+from pyspark.testing.pandasutils import ComparisonTestBase
+from pyspark.testing.sqlutils import SQLTestUtils
+
+
+# This file contains test cases for 'Function application, GroupBy & Window'
+# https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#function-application-groupby-window
+# as well as 'apply_batch*' and 'transform_batch*'.
+class FrameApplyFunctionMixin:
+    @property
+    def pdf(self):
+        return pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [4, 5, 6, 3, 2, 1, 0, 0, 0]},
+            index=np.random.rand(9),
+        )
+
+    @property
+    def df_pair(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+        return pdf, psdf
+
+    def test_apply(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+        )
+
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+                pdf.apply(lambda x, b: x + b, args=(1,)).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, b: x + b, b=1).sort_index(),
+                pdf.apply(lambda x, b: x + b, b=1).sort_index(),
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        self.assert_eq(
+            psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+                pdf.apply(lambda x, c: len(x) + c, axis=1, c=100).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.apply(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*1 or 'column'; however"):
+
+            def f1(_) -> ps.DataFrame[int]:
+                pass
+
+            psdf.apply(f1, axis=0)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*0 or 'index'; however"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.apply(f2, axis=1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: x + 1).sort_index(), pdf.apply(lambda x: x + 1).sort_index()
+            )
+
+        # returning a Series
+        self.assert_eq(
+            psdf.apply(lambda x: len(x), axis=1).sort_index(),
+            pdf.apply(lambda x: len(x), axis=1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.apply(lambda x: len(x), axis=1).sort_index(),
+                pdf.apply(lambda x: len(x), axis=1).sort_index(),
+            )
+
+    def test_apply_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.apply(identify1, axis=1)
+        expected = pdf.apply(identify1, axis=1)
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.apply(identify2, axis=1)
+        expected = pdf.apply(identify2, axis=1)
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+    def test_apply_batch(self):
+        pdf = pd.DataFrame(
+            {
+                "a": [1, 2, 3, 4, 5, 6] * 100,
+                "b": [1.0, 1.0, 2.0, 3.0, 5.0, 8.0] * 100,
+                "c": [1, 4, 9, 16, 25, 36] * 100,
+            },
+            columns=["a", "b", "c"],
+            index=np.random.rand(600),
+        )
+        psdf = ps.DataFrame(pdf)
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda pdf, a: pdf + a, args=(1,)).sort_index(),
+            (pdf + 1).sort_index(),
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf: pdf + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda pdf, b: pdf + b, b=1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+        with self.assertRaisesRegex(AssertionError, "the first argument should be a callable"):
+            psdf.pandas_on_spark.apply_batch(1)
+
+        with self.assertRaisesRegex(TypeError, "The given function.*frame as its type hints"):
+
+            def f2(_) -> ps.Series[int]:
+                pass
+
+            psdf.pandas_on_spark.apply_batch(f2)
+
+        with self.assertRaisesRegex(ValueError, "The given function should return a frame"):
+            psdf.pandas_on_spark.apply_batch(lambda pdf: 1)
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(), (pdf + 1).sort_index()
+        )
+        with option_context("compute.shortcut_limit", 500):
+            self.assert_eq(
+                psdf.pandas_on_spark.apply_batch(lambda x: x + 1).sort_index(),
+                (pdf + 1).sort_index(),
+            )
+
+    def test_apply_batch_with_type(self):
+        pdf = self.pdf
+        psdf = ps.from_pandas(pdf)
+
+        def identify1(x) -> ps.DataFrame[int, int]:
+            return x
+
+        # Type hints set the default column names, and we use default index for
+        # pandas API on Spark. Here we ignore both diff.
+        actual = psdf.pandas_on_spark.apply_batch(identify1)
+        expected = pdf
+        self.assert_eq(sorted(actual["c0"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["c1"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        def identify2(x) -> ps.DataFrame[slice("a", int), slice("b", int)]:  # noqa: F405
+            return x
+
+        actual = psdf.pandas_on_spark.apply_batch(identify2)
+        expected = pdf
+        self.assert_eq(sorted(actual["a"].to_numpy()), sorted(expected["a"].to_numpy()))
+        self.assert_eq(sorted(actual["b"].to_numpy()), sorted(expected["b"].to_numpy()))
+
+        pdf = pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6, 7, 8, 9], "b": [[e] for e in [4, 5, 6, 3, 2, 1, 0, 0, 0]]},
+            index=np.random.rand(9),
+        )
+        psdf = ps.from_pandas(pdf)
+
+        def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:

Review Comment:
   Unfortunately, this seems to cause test case failures in Python 3.11 environment.
   
   ```
   ======================================================================
   ERROR [0.686s]: test_apply_batch_with_type (pyspark.pandas.tests.computation.test_apply_func.FrameApplyFunctionTests.test_apply_batch_with_type)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/__w/spark/spark/python/pyspark/pandas/tests/computation/test_apply_func.py", line 248, in test_apply_batch_with_type
       def identify3(x) -> ps.DataFrame[float, [int, List[int]]]:
                           ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/frame.py", line 13540, in __class_getitem__
       return create_tuple_for_frame_type(params)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 721, in create_tuple_for_frame_type
       return Tuple[_to_type_holders(params)]
                    ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 766, in _to_type_holders
       data_types = _new_type_holders(data_types, NameTypeHolder)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/pandas/typedef/typehints.py", line 832, in _new_type_holders
       raise TypeError(
   TypeError: Type hints should be specified as one of:
     - DataFrame[type, type, ...]
     - DataFrame[name: type, name: type, ...]
     - DataFrame[dtypes instance]
     - DataFrame[zip(names, types)]
     - DataFrame[index_type, [type, ...]]
     - DataFrame[(index_name, index_type), [(name, type), ...]]
     - DataFrame[dtype instance, dtypes instance]
     - DataFrame[(index_name, index_type), zip(names, types)]
     - DataFrame[[index_type, ...], [type, ...]]
     - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
     - DataFrame[dtypes instance, dtypes instance]
     - DataFrame[zip(index_names, index_types), zip(names, types)]
   However, got (<class 'int'>, typing.List[int]).
   
   ----------------------------------------------------------------------
   Ran 10 tests in 34.327s
   
   FAILED (errors=1)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org