You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "xinrong-meng (via GitHub)" <gi...@apache.org> on 2024/02/06 22:07:54 UTC

[PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas [spark]

xinrong-meng opened a new pull request, #45050:
URL: https://github.com/apache/spark/pull/45050

   ### What changes were proposed in this pull request?
   Support v2 (perf, memory) profiling in group/cogroup applyInPandas, which rely on physical plan nodes FlatMapGroupsInBatchExec and FlatMapCoGroupsInBatchExec.
   
   ### Why are the changes needed?
   Complete v2 profiling support.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. V2 profiling in group/cogroup applyInPandas is supported.
   
   ### How was this patch tested?
   Unit tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1481939580


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
         self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):

Review Comment:
   Shall we have a test for `applyInArrow` as well?



##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
         self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482315739


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
                 io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(l, r):

Review Comment:
   I was wondering the reason and saw "In some fonts, these characters are indistinguishable from the numerals one and zero. When tempted to use 'l', use 'L' instead." That's good to learn :p 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng closed pull request #45050: [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow
URL: https://github.com/apache/spark/pull/45050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on PR #45050:
URL: https://github.com/apache/spark/pull/45050#issuecomment-1934752703

   Merged to master, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482146994


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
         self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):

Review Comment:
   Good catch!



##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
         self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys()))
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312796


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
                 io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(l, r):

Review Comment:
   Shall we rename to `left` and `right` according to the lint error?



##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
                 io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showPerfProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_perf_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(l, r):

Review Comment:
   Shall we rename `left` and `right` according to the lint error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312940


##########
python/pyspark/tests/test_memory_profiler.py:
##########
@@ -441,6 +441,129 @@ def min_udf(v: pd.Series) -> float:
                 io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
             )
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_group_apply_in_pandas(self):
+        # FlatMapGroupsInBatchExec
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(pdf):
+            v = pdf.v
+            return pdf.assign(v=(v - v.mean()) / v.std())
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df.groupby("id").applyInPandas(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_cogroup_apply_in_pandas(self):
+        # FlatMapCoGroupsInBatchExec
+        import pandas as pd
+
+        df1 = self.spark.createDataFrame(
+            [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+            ("time", "id", "v1"),
+        )
+        df2 = self.spark.createDataFrame(
+            [(20000101, 1, "x"), (20000101, 2, "y")], ("time", "id", "v2")
+        )
+
+        def asof_join(left, right):
+            return pd.merge_asof(left, right, on="time", by="id")
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+                asof_join, schema="time int, id int, v1 double, v2 string"
+            ).show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_group_apply_in_arrow(self):
+        # FlatMapGroupsInBatchExec
+        import pyarrow.compute as pc
+
+        df = self.spark.createDataFrame(
+            [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+        )
+
+        def normalize(table):
+            v = table.column("v")
+            norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+            return table.set_column(1, "v", norm)
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+            df.groupby("id").applyInArrow(normalize, schema="id long, v double").show()
+
+        self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys()))
+
+        for id in self.profile_results:
+            with self.trap_stdout() as io:
+                self.spark.showMemoryProfiles(id)
+
+            self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
+            self.assertRegex(
+                io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+            )
+
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        cast(str, pandas_requirement_message or pyarrow_requirement_message),
+    )
+    def test_memory_profiler_cogroup_apply_in_arrow(self):
+        import pyarrow as pa
+
+        df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1"))
+        df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+        def summarize(l, r):

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org