You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2021/11/19 02:06:27 UTC

[spark] branch python-udf-accumulator created (now 9213a85)

This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a change to branch python-udf-accumulator
in repository https://gitbox.apache.org/repos/asf/spark.git.


      at 9213a85  hacky wip towards python udf profiling

This branch includes the following new commits:

     new 9213a85  hacky wip towards python udf profiling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[spark] 01/01: hacky wip towards python udf profiling

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch python-udf-accumulator
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 9213a85a40499fc7f0e24ea14c5051c45a022ef2
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Wed Oct 20 16:17:44 2021 -0700

    hacky wip towards python udf profiling
---
 python/pyspark/profiler.py |  1 +
 python/pyspark/sql/udf.py  | 15 ++++++++++++---
 python/pyspark/worker.py   | 37 +++++++++++++++++++------------------
 3 files changed, 32 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py
index 99cfe71..8d8458c 100644
--- a/python/pyspark/profiler.py
+++ b/python/pyspark/profiler.py
@@ -142,6 +142,7 @@ class PStatsParam(AccumulatorParam):
 
     @staticmethod
     def addInPlace(value1, value2):
+        print("ACCUM UPDATE PARAM")
         if value1 is None:
             return value2
         value1.add(value2)
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 752ccca..164588f 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -24,6 +24,7 @@ from typing import Callable, Any, TYPE_CHECKING, Optional, cast, Union
 from py4j.java_gateway import JavaObject
 
 from pyspark import SparkContext
+from pyspark.profiler import Profiler
 from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType  # type: ignore[attr-defined]
 from pyspark.sql.column import Column, _to_java_column, _to_seq
 from pyspark.sql.types import (  # type: ignore[attr-defined]
@@ -44,9 +45,10 @@ __all__ = ["UDFRegistration"]
 def _wrap_function(
     sc: SparkContext,
     func: Callable[..., Any],
-    returnType: "DataTypeOrString"
+    returnType: "DataTypeOrString",
+    profiler: Optional[Profiler] = None
 ) -> JavaObject:
-    command = (func, returnType)
+    command = (func, returnType, profiler)
     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     return sc._jvm.PythonFunction(  # type: ignore[attr-defined]
         bytearray(pickled_command),
@@ -199,7 +201,14 @@ class UserDefinedFunction(object):
         spark = SparkSession.builder.getOrCreate()
         sc = spark.sparkContext
 
-        wrapped_func = _wrap_function(sc, self.func, self.returnType)
+        if sc.profiler_collector:
+            profiler = sc.profiler_collector.new_profiler(sc)
+			# TODO: better ID
+            sc.profiler_collector.add_profiler(0, profiler)
+        else:
+            profiler = None
+
+        wrapped_func = _wrap_function(sc, self.func, self.returnType, profiler)
         jdt = spark._jsparkSession.parseDataType(self.returnType.json())
         judf = (
             sc._jvm.org.apache.spark.sql.execution.python  # type: ignore[attr-defined]
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index ad6c003..8f13822 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -247,8 +247,9 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
     num_arg = read_int(infile)
     arg_offsets = [read_int(infile) for i in range(num_arg)]
     chained_func = None
+    profiler = None
     for i in range(read_int(infile)):
-        f, return_type = read_command(pickleSer, infile)
+        f, return_type, profiler = read_command(pickleSer, infile)
         if chained_func is None:
             chained_func = f
         else:
@@ -263,28 +264,29 @@ def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index):
 
     # the last returnType will be the return type of UDF
     if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
-        return arg_offsets, wrap_scalar_pandas_udf(func, return_type)
+        return arg_offsets, profiler, wrap_scalar_pandas_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
-        return arg_offsets, wrap_pandas_iter_udf(func, return_type)
+        return arg_offsets, profiler, wrap_pandas_iter_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
-        return arg_offsets, wrap_pandas_iter_udf(func, return_type)
+        return arg_offsets, profiler, wrap_pandas_iter_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
         argspec = getfullargspec(chained_func)  # signature was lost when wrapping it
-        return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec)
+        return arg_offsets, profiler, wrap_grouped_map_pandas_udf(func, return_type, argspec)
     elif eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
         argspec = getfullargspec(chained_func)  # signature was lost when wrapping it
-        return arg_offsets, wrap_cogrouped_map_pandas_udf(func, return_type, argspec)
+        return arg_offsets, profiler, wrap_cogrouped_map_pandas_udf(func, return_type, argspec)
     elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
-        return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
+        return arg_offsets, profiler, wrap_grouped_agg_pandas_udf(func, return_type)
     elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF:
-        return arg_offsets, wrap_window_agg_pandas_udf(func, return_type, runner_conf, udf_index)
+        return arg_offsets, profiler, wrap_window_agg_pandas_udf(func, return_type, runner_conf, udf_index)
     elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
-        return arg_offsets, wrap_udf(func, return_type)
+        return arg_offsets, profiler, wrap_udf(func, return_type)
     else:
         raise ValueError("Unknown eval type: {}".format(eval_type))
 
 
 def read_udfs(pickleSer, infile, eval_type):
+    profiler = None
     runner_conf = {}
 
     if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
@@ -335,7 +337,7 @@ def read_udfs(pickleSer, infile, eval_type):
         if is_map_iter:
             assert num_udfs == 1, "One MAP_ITER UDF expected here."
 
-        arg_offsets, udf = read_single_udf(
+        arg_offsets, profiler, udf = read_single_udf(
             pickleSer, infile, eval_type, runner_conf, udf_index=0)
 
         def func(_, iterator):
@@ -381,8 +383,7 @@ def read_udfs(pickleSer, infile, eval_type):
                         "the same with the input's; however, the length of output was %d and the "
                         "length of input was %d." % (num_output_rows, num_input_rows))
 
-        # profiling is not supported for UDF
-        return func, None, ser, ser
+        return func, profiler, ser, ser
 
     def extract_key_value_indexes(grouped_arg_offsets):
         """
@@ -420,7 +421,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
         # See FlatMapGroupsInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
+        arg_offsets, profiler, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
         # Create function like this:
@@ -433,7 +434,7 @@ def read_udfs(pickleSer, infile, eval_type):
         # We assume there is only one UDF here because cogrouped map doesn't
         # support combining multiple UDFs.
         assert num_udfs == 1
-        arg_offsets, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
+        arg_offsets, profiler, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=0)
 
         parsed_offsets = extract_key_value_indexes(arg_offsets)
 
@@ -446,7 +447,8 @@ def read_udfs(pickleSer, infile, eval_type):
     else:
         udfs = []
         for i in range(num_udfs):
-            udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
+            arg_offsets, profiler, f = read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i)
+            udfs.append((arg_offsets, f))
 
         def mapper(a):
             result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
@@ -459,8 +461,7 @@ def read_udfs(pickleSer, infile, eval_type):
 
     func = lambda _, it: map(mapper, it)
 
-    # profiling is not supported for UDF
-    return func, None, ser, ser
+    return func, profiler, ser, ser
 
 
 def main(infile, outfile):
@@ -599,7 +600,7 @@ def main(infile, outfile):
         _accumulatorRegistry.clear()
         eval_type = read_int(infile)
         if eval_type == PythonEvalType.NON_UDF:
-            func, profiler, deserializer, serializer = read_command(pickleSer, infile)
+            func, rofiler, deserializer, serializer = read_command(pickleSer, infile)
         else:
             func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org