You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Davies Liu <da...@databricks.com> on 2016/04/01 01:41:32 UTC

Re: Making BatchPythonEvaluation actually Batch

@Justin, it's fixed by https://github.com/apache/spark/pull/12057

On Thu, Feb 11, 2016 at 11:26 AM, Davies Liu <da...@databricks.com> wrote:
> Had a quick look in your commit, I think that make sense, could you
> send a PR for that, then we can review it.
>
> In order to support 2), we need to change the serialized Python
> function from `f(iter)` to `f(x)`, process one row at a time (not a
> partition),
> then we can easily combine them together:
>
> for f1(f2(x))  and g1(g2(x)), we can do this in Python:
>
> for row in reading_stream:
>    x1, x2 = row
>    y1 = f1(f2(x1))
>    y2 = g1(g2(x2))
>    yield (y1, y2)
>
> For RDD, we still need to use `f(iter)`, but for SQL UDF, use `f(x)`.
>
> On Sun, Jan 31, 2016 at 1:37 PM, Justin Uang <ju...@gmail.com> wrote:
>> Hey guys,
>>
>> BLUF: sorry for the length of this email, trying to figure out how to batch
>> Python UDF executions, and since this is my first time messing with
>> catalyst, would like any feedback
>>
>> My team is starting to use PySpark UDFs quite heavily, and performance is a
>> huge blocker. The extra roundtrip serialization from Java to Python is not a
>> huge concern if we only incur it ~once per column for most workflows, since
>> it'll be in the same order of magnitude as reading files from disk. However,
>> right now each Python UDFs lead to a single roundtrip. There is definitely a
>> lot we can do regarding this:
>>
>> (all the prototyping code is here:
>> https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc)
>>
>> 1. We can't chain Python UDFs.
>>
>>     df.select(python_times_2(python_times_2("col1")))
>>
>> throws an exception saying that the inner expression isn't evaluable. The
>> workaround is to do
>>
>>
>> df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp"))
>>
>> This can be solved in ExtractPythonUDFs by always extracting the inner most
>> Python UDF first.
>>
>>          // Pick the UDF we are going to evaluate (TODO: Support evaluating
>> multiple UDFs at a time)
>>          // If there is more than one, we will add another evaluation
>> operator in a subsequent pass.
>> -        udfs.find(_.resolved) match {
>> +        udfs.find { udf =>
>> +          udf.resolved && udf.children.map { child: Expression =>
>> +            child.find { // really hacky way to find if a child of a udf
>> has the PythonUDF node
>> +              case p: PythonUDF => true
>> +              case _ => false
>> +            }.isEmpty
>> +          }.reduce((x, y) => x && y)
>> +        } match {
>>            case Some(udf) =>
>>              var evaluation: EvaluatePython = null
>>
>> 2. If we have a Python UDF applied to many different columns, where they
>> don’t depend on each other, we can optimize them by collapsing them down
>> into a single python worker. Although we have to serialize and send the same
>> amount of data to the python interpreter, in the case where I am applying
>> the same function to 20 columns, the overhead/context_switches of having 20
>> interpreters run at the same time causes huge performance hits. I have
>> confirmed this by manually taking the 20 columns, converting them to a
>> struct, and then writing a UDF that processes the struct at the same time,
>> and the speed difference is 2x. My approach to adding this to catalyst is
>> basically to write an optimizer rule called CombinePython which joins
>> adjacent EvaluatePython nodes that don’t depend on each other’s variables,
>> and then having BatchPythonEvaluation run multiple lambdas at once. I would
>> also like to be able to handle the case
>> df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”),
>> python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a
>> PushDownPythonEvaluation optimizer that will push the optimization through a
>> select/project, so that the CombinePython rule can join the two.
>>
>> 3. I would like CombinePython to be able to handle UDFs that chain off of
>> each other.
>>
>>     df.select(python_times_2(python_times_2(“col1”)))
>>
>> I haven’t prototyped this yet, since it’s a lot more complex. The way I’m
>> thinking about this is to still have a rule called CombinePython, except
>> that the BatchPythonEvaluation will need to be smart enough to build up the
>> dag of dependencies, and then feed that information to the python
>> interpreter, so it can compute things in the right order, and reuse the
>> in-memory objects that it has already computed. Does this seem right? Should
>> the code mainly be in BatchPythonEvaluation? In addition, we will need to
>> change up the protocol between the java and python sides to support sending
>> this information. What is acceptable?
>>
>> Any help would be much appreciated! Especially w.r.t where to the design
>> choices such that the PR that has a chance of being accepted.
>>
>> Justin

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org