You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ueshin (via GitHub)" <gi...@apache.org> on 2024/01/12 01:03:56 UTC

[PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

ueshin opened a new pull request, #44697:
URL: https://github.com/apache/spark/pull/44697

   ### What changes were proposed in this pull request?
   
   Basic support of SparkSession based Python UDF profiler.
   
   To enable the profiler, use a SQL conf `spark.sql.pyspark.udf.profiler`:
   
   - `"perf"`: enable cProfiler
   - "memory": enable memory-profiler (TODO: [SPARK-46687](https://issues.apache.org/jira/browse/SPARK-46687))
   
   ```py
   from pyspark.sql.functions import *
   
   spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")  # enable cProfiler
   
   @udf("string")
   def f(x):
         return str(x)
   
   df = spark.range(10).select(f(col("id")))
   df.collect()
   
   @pandas_udf("string")
   def g(x):
        return x.astype("string")
   
   df = spark.range(10).select(g(col("id")))
   
   spark.conf.unset("spark.sql.pyspark.udf.profiler")  # disable
   
   df.collect()  # won't profile
   
   spark.show_perf_profiles()  # show the result for only the first collect.
   ```
   
   ### Why are the changes needed?
   
   The existing UDF profilers are SparkContext based, which can't support Spark Connect.
   
   We should introduce SparkSession based profilers and support Spark Connect.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, SparkSession-based UDF profilers will be available.
   
   ### How was this patch tested?
   
   Added the related tests, manually, and existing tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456443138


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   > looks like we don't need `Try(...)` here? 
   
   In some tests, mocks of `session` or `sparkContext` are used and they throw an exception when creating accumulators.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456470029


##########
python/pyspark/sql/profiler.py:
##########
@@ -0,0 +1,176 @@
+#

Review Comment:
   No, the new config and `spark.showPerfProfiles` should be the new user facing API, and [SPARK-46687](https://issues.apache.org/jira/browse/SPARK-46687) will add `spark.showMemoryProfiles`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1453876420


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")

Review Comment:
   I'm wondering if "setting the config to ... disables the profiler" is more clear than "unsetting the config".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #44697: [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler
URL: https://github.com/apache/spark/pull/44697


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454489134


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   BTW, if the profile is disabled, we shouldn't probably create this accumulator to avoid performance issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1453948329


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")

Review Comment:
   I'm not sure .. What do you think the `...` should be in that case? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454293613


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")
+      .version("4.0.0")
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .checkValues(Set("perf", "memory"))

Review Comment:
   I wonder if it's more straightforward to use the module name. e.g., `cProfile` and `memory-profiler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44697:
URL: https://github.com/apache/spark/pull/44697#issuecomment-1897513386

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454293613


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")
+      .version("4.0.0")
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .checkValues(Set("perf", "memory"))

Review Comment:
   I wonder if it's more straightforward to use the module name. e.g., `cProfiler` and `memory-profiler`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454242920


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")

Review Comment:
   Sorry I was looking at pyspark.SparkConfig, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456338510


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")
+      .version("4.0.0")
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .checkValues(Set("perf", "memory"))

Review Comment:
   I noticed there are multiple user-facing references to the current "perf" profiler: [Python Profilers for UDFs ](https://spark.apache.org/docs/latest/api/python/development/debugging.html#id7), [Workers profiling](https://www.databricks.com/blog/how-profile-pyspark). It would be great we could make them consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456469906


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -145,6 +155,190 @@ def test_unsupported(self):
             )
 
 
+class UDFProfiler2TestsMixin:
+    @contextmanager
+    def trap_stdout(self):
+        old_stdout = sys.stdout
+        sys.stdout = io = StringIO()
+        try:
+            yield io
+        finally:
+            sys.stdout = old_stdout
+
+    @property
+    def profile_results(self):
+        return self.spark._profiler_collector._perf_profile_results
+
+    def test_perf_profiler_udf(self):
+        _do_computation(self.spark)
+
+        # Without the conf enabled, no profile results are collected.
+        self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            _do_computation(self.spark)
+
+        self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
+
+        with self.trap_stdout() as io_all:
+            self.spark.show_perf_profiles()

Review Comment:
   Sure, let me change it to `showPerfProfiles`.



##########
python/pyspark/sql/profiler.py:
##########
@@ -0,0 +1,176 @@
+#

Review Comment:
   No, the new config and `spark.showPerfProfiles` should be the new user facing API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456590439


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   My concern is that regisgerting too many acumulators because calling this will create and register accumator for each session. Especially for Spark Connent, there could be a lot of Spark sessions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454492024


##########
python/pyspark/sql/profiler.py:
##########
@@ -0,0 +1,176 @@
+#

Review Comment:
   qq do we want to expose any of them in this file as an API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456443138


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   > looks like we don't need `Try(...)` here? 
   
   In some tests, mocks of `session` or `sparkContext` are used and it throws an exception when creating accumulators.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456391588


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")
+      .version("4.0.0")
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .checkValues(Set("perf", "memory"))

Review Comment:
   @xinrong-meng what's the suggestion? could you elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456459431


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   > if the profile is disabled, we shouldn't probably create this accumulator to avoid performance issue.
   
   It needs to always have the accumulator because:
   - it can't know whether or not / when the profiler is enabled
   - to support the registered UDFs
   
   What kind of performance issue do you concern?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454127640


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")

Review Comment:
   I'm not sure what `...` should be, I guess that is the reason why I suggested not using "unsetting". How do we expect users to "unset" a config, is there a specific API?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454150862


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")

Review Comment:
   Yes, there is:
   
   https://github.com/apache/spark/blob/e7a27e94e02d24a8ea5917b6e2b9c00239e5bdd2/python/pyspark/sql/conf.py#L96-L118
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456637641


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   👌 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454440376


##########
python/pyspark/sql/tests/test_udf_profiler.py:
##########
@@ -145,6 +155,190 @@ def test_unsupported(self):
             )
 
 
+class UDFProfiler2TestsMixin:
+    @contextmanager
+    def trap_stdout(self):
+        old_stdout = sys.stdout
+        sys.stdout = io = StringIO()
+        try:
+            yield io
+        finally:
+            sys.stdout = old_stdout
+
+    @property
+    def profile_results(self):
+        return self.spark._profiler_collector._perf_profile_results
+
+    def test_perf_profiler_udf(self):
+        _do_computation(self.spark)
+
+        # Without the conf enabled, no profile results are collected.
+        self.assertEqual(0, len(self.profile_results), str(list(self.profile_results)))
+
+        with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+            _do_computation(self.spark)
+
+        self.assertEqual(3, len(self.profile_results), str(list(self.profile_results)))
+
+        with self.trap_stdout() as io_all:
+            self.spark.show_perf_profiles()

Review Comment:
   I wonder if we should name it like `showPerfProfiles` or `spark.profile.show()` or `spark.showprofile`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1454486826


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   Hm, looks like we don't need `Try(...)` here? I took a cursory look, and seems it won't throw an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on PR #44697:
URL: https://github.com/apache/spark/pull/44697#issuecomment-1894330650

   LGTM after the conflicts resolved, thanks for the nice work!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456611223


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -371,6 +373,14 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
   private[connect] def listListenerIds(): Seq[String] = {
     listenerCache.keySet().asScala.toSeq
   }
+
+  /**
+   * An accumulator for Python executors.
+   *
+   * The accumulated results will be sent to the Python client via observed_metrics message.
+   */
+  private[connect] val pythonAccumulator: Option[PythonAccumulator] =
+    Try(session.sparkContext.collectionAccumulator[Array[Byte]]).toOption

Review Comment:
   There are already much more accumulators registered for each query, as `SQLMetrics`. I don't think one more accumulator per session could be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1453896168


##########
python/pyspark/sql/profiler.py:
##########
@@ -0,0 +1,176 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from abc import ABC, abstractmethod
+import pstats
+from threading import RLock
+from typing import Dict, Optional, Tuple
+
+from pyspark.accumulators import (
+    Accumulator,
+    AccumulatorParam,
+    SpecialAccumulatorIds,
+    _accumulatorRegistry,
+)
+from pyspark.profiler import CodeMapDict, MemoryProfiler, MemUsageParam, PStatsParam
+
+
+ProfileResults = Dict[int, Tuple[Optional[pstats.Stats], Optional[CodeMapDict]]]

Review Comment:
   I'm wondering if it's better to put it in "python/pyspark/sql/_typing.pyi".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46686][PYTHON][CONNECT] Basic support of SparkSession based Python UDF profiler [spark]

Posted by "xinrong-meng (via GitHub)" <gi...@apache.org>.
xinrong-meng commented on code in PR #44697:
URL: https://github.com/apache/spark/pull/44697#discussion_r1456645856


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2927,6 +2927,17 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val PYTHON_UDF_PROFILER =
+    buildConf("spark.sql.pyspark.udf.profiler")
+      .doc("Configure the Python/Pandas UDF profiler by enabling or disabling it " +
+        "with the option to choose between \"perf\" and \"memory\" types, " +
+        "or unsetting the config disables the profiler. This is disabled by default.")
+      .version("4.0.0")
+      .stringConf
+      .transform(_.toLowerCase(Locale.ROOT))
+      .checkValues(Set("perf", "memory"))

Review Comment:
   I could adjust those references once we decide a standard name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org