You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Justin Uang <ju...@gmail.com> on 2015/06/24 00:27:11 UTC

Python UDF performance at large scale

BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
but I have a proof-of-concept implementation that avoids caching the entire
dataset.

Hi,

We have been running into performance problems using Python UDFs with
DataFrames at large scale.

>From the implementation of BatchPythonEvaluation, it looks like the goal
was to reuse the PythonRDD code. It caches the entire child RDD so that it
can do two passes over the data. One to give to the PythonRDD, then one to
join the python lambda results with the original row (which may have java
objects that should be passed through).

In addition, it caches all the columns, even the ones that don't need to be
processed by the Python UDF. In the cases I was working with, I had a 500
column table, and i wanted to use a python UDF for one column, and it ended
up caching all 500 columns.

I have a working solution over here that does it in one pass over the data,
avoiding caching (
https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
With this patch, I go from a job that takes 20 minutes then OOMs, to a job
that finishes completely in 3 minutes. It is indeed quite hacky and prone
to deadlocks since there is buffering in many locations:

    - NEW: the ForkingIterator LinkedBlockingDeque
    - batching the rows before pickling them
    - os buffers on both sides
    - pyspark.serializers.BatchedSerializer

We can avoid deadlock by being very disciplined. For example, we can have
the ForkingIterator instead always do a check of whether the
LinkedBlockingDeque is full and if so:

Java
    - flush the java pickling buffer
    - send a flush command to the python process
    - os.flush the java side

Python
    - flush BatchedSerializer
    - os.flush()

I haven't added this yet. This is getting very complex however. Another
model would just be to change the protocol between the java side and the
worker to be a synchronous request/response. This has the disadvantage that
the CPU isn't doing anything when the batch is being sent across, but it
has the huge advantage of simplicity. In addition, I imagine that the
actual IO between the processes isn't that slow, but rather the
serialization of java objects into pickled bytes, and the
deserialization/serialization + python loops on the python side. Another
advantage is that we won't be taking more than 100% CPU since only one
thread is doing CPU work at a time between the executor and the python
interpreter.

Any thoughts would be much appreciated =)

Other improvements:
    - extract some code of the worker out of PythonRDD so that we can do a
mapPartitions directly in BatchedPythonEvaluation without resorting to the
hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
RDD can get a handle to the same iterator.
    - read elements and use a size estimator to create the BlockingQueue to
make sure that we don't store too many things in memory when batching
    - patch Unpickler to not use StopException for control flow, which is
slowing down the java side

Re: Python UDF performance at large scale

Posted by Justin Uang <ju...@gmail.com>.
Sweet, filed here: https://issues.apache.org/jira/browse/SPARK-8632

On Thu, Jun 25, 2015 at 3:05 AM Davies Liu <da...@databricks.com> wrote:

> I'm thinking that the batched synchronous version will be too slow
> (with small batch size) or easy to OOM with large (batch size). If
> it's not that hard, you can give it a try.
>
> On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang <ju...@gmail.com>
> wrote:
> > Correct, I was running with a batch size of about 100 when I did the
> tests,
> > because I was worried about deadlocks. Do you have any concerns regarding
> > the batched synchronous version of communication between the Java and
> Python
> > processes, and if not, should I file a ticket and starting writing it?
> >
> > On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <da...@databricks.com>
> wrote:
> >>
> >> From you comment, the 2x improvement only happens when you have the
> >> batch size as 1, right?
> >>
> >> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <ju...@gmail.com>
> >> wrote:
> >> > FYI, just submitted a PR to Pyrolite to remove their StopException.
> >> > https://github.com/irmen/Pyrolite/pull/30
> >> >
> >> > With my benchmark, removing it basically made it about 2x faster.
> >> >
> >> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
> >> > <pu...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Hi Davies,
> >> >>
> >> >> In general, do we expect people to use CPython only for "heavyweight"
> >> >> UDFs
> >> >> that invoke an external library? Are there any examples of using
> >> >> Jython,
> >> >> especially performance comparisons to Java/Scala and CPython? When
> >> >> using
> >> >> Jython, do you expect the driver to send code to the executor as a
> >> >> string,
> >> >> or is there a good way to serialized Jython lambdas?
> >> >>
> >> >> (For context, I was unable to serialize Nashorn lambdas when I tried
> to
> >> >> use them in Spark.)
> >> >>
> >> >> Punya
> >> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <da...@databricks.com>
> >> >> wrote:
> >> >>>
> >> >>> Fare points, I also like simpler solutions.
> >> >>>
> >> >>> The overhead of Python task could be a few of milliseconds, which
> >> >>> means we also should eval them as batches (one Python task per
> batch).
> >> >>>
> >> >>> Decreasing the batch size for UDF sounds reasonable to me, together
> >> >>> with other tricks to reduce the data in socket/pipe buffer.
> >> >>>
> >> >>> BTW, what do your UDF looks like? How about to use Jython to run
> >> >>> simple Python UDF (without some external libraries).
> >> >>>
> >> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.uang@gmail.com
> >
> >> >>> wrote:
> >> >>> > // + punya
> >> >>> >
> >> >>> > Thanks for your quick response!
> >> >>> >
> >> >>> > I'm not sure that using an unbounded buffer is a good solution to
> >> >>> > the
> >> >>> > locking problem. For example, in the situation where I had 500
> >> >>> > columns,
> >> >>> > I am
> >> >>> > in fact storing 499 extra columns on the java side, which might
> make
> >> >>> > me
> >> >>> > OOM
> >> >>> > if I have to store many rows. In addition, if I am using an
> >> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16
> ==
> >> >>> > 65536
> >> >>> > rows before python starts outputting elements, in which case, the
> >> >>> > Java
> >> >>> > side
> >> >>> > has to buffer 65536 complete rows. In general it seems fragile to
> >> >>> > rely
> >> >>> > on
> >> >>> > blocking behavior in the Python coprocess. By contrast, it's very
> >> >>> > easy
> >> >>> > to
> >> >>> > verify the correctness and performance characteristics of the
> >> >>> > synchronous
> >> >>> > blocking solution.
> >> >>> >
> >> >>> >
> >> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <davies@databricks.com
> >
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> Thanks for looking into it, I'd like the idea of having
> >> >>> >> ForkingIterator. If we have unlimited buffer in it, then will not
> >> >>> >> have
> >> >>> >> the problem of deadlock, I think. The writing thread will be
> >> >>> >> blocked
> >> >>> >> by Python process, so there will be not much rows be
> buffered(still
> >> >>> >> be
> >> >>> >> a reason to OOM). At least, this approach is better than current
> >> >>> >> one.
> >> >>> >>
> >> >>> >> Could you create a JIRA and sending out the PR?
> >> >>> >>
> >> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang
> >> >>> >> <ju...@gmail.com>
> >> >>> >> wrote:
> >> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at
> large
> >> >>> >> > scale,
> >> >>> >> > but
> >> >>> >> > I have a proof-of-concept implementation that avoids caching
> the
> >> >>> >> > entire
> >> >>> >> > dataset.
> >> >>> >> >
> >> >>> >> > Hi,
> >> >>> >> >
> >> >>> >> > We have been running into performance problems using Python
> UDFs
> >> >>> >> > with
> >> >>> >> > DataFrames at large scale.
> >> >>> >> >
> >> >>> >> > From the implementation of BatchPythonEvaluation, it looks like
> >> >>> >> > the
> >> >>> >> > goal
> >> >>> >> > was
> >> >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so
> >> >>> >> > that
> >> >>> >> > it
> >> >>> >> > can
> >> >>> >> > do two passes over the data. One to give to the PythonRDD, then
> >> >>> >> > one
> >> >>> >> > to
> >> >>> >> > join
> >> >>> >> > the python lambda results with the original row (which may have
> >> >>> >> > java
> >> >>> >> > objects
> >> >>> >> > that should be passed through).
> >> >>> >> >
> >> >>> >> > In addition, it caches all the columns, even the ones that
> don't
> >> >>> >> > need to
> >> >>> >> > be
> >> >>> >> > processed by the Python UDF. In the cases I was working with, I
> >> >>> >> > had
> >> >>> >> > a
> >> >>> >> > 500
> >> >>> >> > column table, and i wanted to use a python UDF for one column,
> >> >>> >> > and
> >> >>> >> > it
> >> >>> >> > ended
> >> >>> >> > up caching all 500 columns.
> >> >>> >> >
> >> >>> >> > I have a working solution over here that does it in one pass
> over
> >> >>> >> > the
> >> >>> >> > data,
> >> >>> >> > avoiding caching
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> >> >>> >> > With this patch, I go from a job that takes 20 minutes then
> OOMs,
> >> >>> >> > to
> >> >>> >> > a
> >> >>> >> > job
> >> >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky
> >> >>> >> > and
> >> >>> >> > prone to
> >> >>> >> > deadlocks since there is buffering in many locations:
> >> >>> >> >
> >> >>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
> >> >>> >> >     - batching the rows before pickling them
> >> >>> >> >     - os buffers on both sides
> >> >>> >> >     - pyspark.serializers.BatchedSerializer
> >> >>> >> >
> >> >>> >> > We can avoid deadlock by being very disciplined. For example,
> we
> >> >>> >> > can
> >> >>> >> > have
> >> >>> >> > the ForkingIterator instead always do a check of whether the
> >> >>> >> > LinkedBlockingDeque is full and if so:
> >> >>> >> >
> >> >>> >> > Java
> >> >>> >> >     - flush the java pickling buffer
> >> >>> >> >     - send a flush command to the python process
> >> >>> >> >     - os.flush the java side
> >> >>> >> >
> >> >>> >> > Python
> >> >>> >> >     - flush BatchedSerializer
> >> >>> >> >     - os.flush()
> >> >>> >> >
> >> >>> >> > I haven't added this yet. This is getting very complex however.
> >> >>> >> > Another
> >> >>> >> > model would just be to change the protocol between the java
> side
> >> >>> >> > and
> >> >>> >> > the
> >> >>> >> > worker to be a synchronous request/response. This has the
> >> >>> >> > disadvantage
> >> >>> >> > that
> >> >>> >> > the CPU isn't doing anything when the batch is being sent
> across,
> >> >>> >> > but it
> >> >>> >> > has
> >> >>> >> > the huge advantage of simplicity. In addition, I imagine that
> the
> >> >>> >> > actual
> >> >>> >> > IO
> >> >>> >> > between the processes isn't that slow, but rather the
> >> >>> >> > serialization
> >> >>> >> > of
> >> >>> >> > java
> >> >>> >> > objects into pickled bytes, and the
> deserialization/serialization
> >> >>> >> > +
> >> >>> >> > python
> >> >>> >> > loops on the python side. Another advantage is that we won't be
> >> >>> >> > taking
> >> >>> >> > more
> >> >>> >> > than 100% CPU since only one thread is doing CPU work at a time
> >> >>> >> > between
> >> >>> >> > the
> >> >>> >> > executor and the python interpreter.
> >> >>> >> >
> >> >>> >> > Any thoughts would be much appreciated =)
> >> >>> >> >
> >> >>> >> > Other improvements:
> >> >>> >> >     - extract some code of the worker out of PythonRDD so that
> we
> >> >>> >> > can do
> >> >>> >> > a
> >> >>> >> > mapPartitions directly in BatchedPythonEvaluation without
> >> >>> >> > resorting
> >> >>> >> > to
> >> >>> >> > the
> >> >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure
> that
> >> >>> >> > the
> >> >>> >> > other
> >> >>> >> > RDD can get a handle to the same iterator.
> >> >>> >> >     - read elements and use a size estimator to create the
> >> >>> >> > BlockingQueue
> >> >>> >> > to
> >> >>> >> > make sure that we don't store too many things in memory when
> >> >>> >> > batching
> >> >>> >> >     - patch Unpickler to not use StopException for control
> flow,
> >> >>> >> > which
> >> >>> >> > is
> >> >>> >> > slowing down the java side
> >> >>> >> >
> >> >>> >> >
>

Re: Python UDF performance at large scale

Posted by Davies Liu <da...@databricks.com>.
I'm thinking that the batched synchronous version will be too slow
(with small batch size) or easy to OOM with large (batch size). If
it's not that hard, you can give it a try.

On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang <ju...@gmail.com> wrote:
> Correct, I was running with a batch size of about 100 when I did the tests,
> because I was worried about deadlocks. Do you have any concerns regarding
> the batched synchronous version of communication between the Java and Python
> processes, and if not, should I file a ticket and starting writing it?
>
> On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <da...@databricks.com> wrote:
>>
>> From you comment, the 2x improvement only happens when you have the
>> batch size as 1, right?
>>
>> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <ju...@gmail.com>
>> wrote:
>> > FYI, just submitted a PR to Pyrolite to remove their StopException.
>> > https://github.com/irmen/Pyrolite/pull/30
>> >
>> > With my benchmark, removing it basically made it about 2x faster.
>> >
>> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
>> > <pu...@gmail.com>
>> > wrote:
>> >>
>> >> Hi Davies,
>> >>
>> >> In general, do we expect people to use CPython only for "heavyweight"
>> >> UDFs
>> >> that invoke an external library? Are there any examples of using
>> >> Jython,
>> >> especially performance comparisons to Java/Scala and CPython? When
>> >> using
>> >> Jython, do you expect the driver to send code to the executor as a
>> >> string,
>> >> or is there a good way to serialized Jython lambdas?
>> >>
>> >> (For context, I was unable to serialize Nashorn lambdas when I tried to
>> >> use them in Spark.)
>> >>
>> >> Punya
>> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <da...@databricks.com>
>> >> wrote:
>> >>>
>> >>> Fare points, I also like simpler solutions.
>> >>>
>> >>> The overhead of Python task could be a few of milliseconds, which
>> >>> means we also should eval them as batches (one Python task per batch).
>> >>>
>> >>> Decreasing the batch size for UDF sounds reasonable to me, together
>> >>> with other tricks to reduce the data in socket/pipe buffer.
>> >>>
>> >>> BTW, what do your UDF looks like? How about to use Jython to run
>> >>> simple Python UDF (without some external libraries).
>> >>>
>> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <ju...@gmail.com>
>> >>> wrote:
>> >>> > // + punya
>> >>> >
>> >>> > Thanks for your quick response!
>> >>> >
>> >>> > I'm not sure that using an unbounded buffer is a good solution to
>> >>> > the
>> >>> > locking problem. For example, in the situation where I had 500
>> >>> > columns,
>> >>> > I am
>> >>> > in fact storing 499 extra columns on the java side, which might make
>> >>> > me
>> >>> > OOM
>> >>> > if I have to store many rows. In addition, if I am using an
>> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
>> >>> > 65536
>> >>> > rows before python starts outputting elements, in which case, the
>> >>> > Java
>> >>> > side
>> >>> > has to buffer 65536 complete rows. In general it seems fragile to
>> >>> > rely
>> >>> > on
>> >>> > blocking behavior in the Python coprocess. By contrast, it's very
>> >>> > easy
>> >>> > to
>> >>> > verify the correctness and performance characteristics of the
>> >>> > synchronous
>> >>> > blocking solution.
>> >>> >
>> >>> >
>> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com>
>> >>> > wrote:
>> >>> >>
>> >>> >> Thanks for looking into it, I'd like the idea of having
>> >>> >> ForkingIterator. If we have unlimited buffer in it, then will not
>> >>> >> have
>> >>> >> the problem of deadlock, I think. The writing thread will be
>> >>> >> blocked
>> >>> >> by Python process, so there will be not much rows be buffered(still
>> >>> >> be
>> >>> >> a reason to OOM). At least, this approach is better than current
>> >>> >> one.
>> >>> >>
>> >>> >> Could you create a JIRA and sending out the PR?
>> >>> >>
>> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang
>> >>> >> <ju...@gmail.com>
>> >>> >> wrote:
>> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
>> >>> >> > scale,
>> >>> >> > but
>> >>> >> > I have a proof-of-concept implementation that avoids caching the
>> >>> >> > entire
>> >>> >> > dataset.
>> >>> >> >
>> >>> >> > Hi,
>> >>> >> >
>> >>> >> > We have been running into performance problems using Python UDFs
>> >>> >> > with
>> >>> >> > DataFrames at large scale.
>> >>> >> >
>> >>> >> > From the implementation of BatchPythonEvaluation, it looks like
>> >>> >> > the
>> >>> >> > goal
>> >>> >> > was
>> >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so
>> >>> >> > that
>> >>> >> > it
>> >>> >> > can
>> >>> >> > do two passes over the data. One to give to the PythonRDD, then
>> >>> >> > one
>> >>> >> > to
>> >>> >> > join
>> >>> >> > the python lambda results with the original row (which may have
>> >>> >> > java
>> >>> >> > objects
>> >>> >> > that should be passed through).
>> >>> >> >
>> >>> >> > In addition, it caches all the columns, even the ones that don't
>> >>> >> > need to
>> >>> >> > be
>> >>> >> > processed by the Python UDF. In the cases I was working with, I
>> >>> >> > had
>> >>> >> > a
>> >>> >> > 500
>> >>> >> > column table, and i wanted to use a python UDF for one column,
>> >>> >> > and
>> >>> >> > it
>> >>> >> > ended
>> >>> >> > up caching all 500 columns.
>> >>> >> >
>> >>> >> > I have a working solution over here that does it in one pass over
>> >>> >> > the
>> >>> >> > data,
>> >>> >> > avoiding caching
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
>> >>> >> > With this patch, I go from a job that takes 20 minutes then OOMs,
>> >>> >> > to
>> >>> >> > a
>> >>> >> > job
>> >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky
>> >>> >> > and
>> >>> >> > prone to
>> >>> >> > deadlocks since there is buffering in many locations:
>> >>> >> >
>> >>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
>> >>> >> >     - batching the rows before pickling them
>> >>> >> >     - os buffers on both sides
>> >>> >> >     - pyspark.serializers.BatchedSerializer
>> >>> >> >
>> >>> >> > We can avoid deadlock by being very disciplined. For example, we
>> >>> >> > can
>> >>> >> > have
>> >>> >> > the ForkingIterator instead always do a check of whether the
>> >>> >> > LinkedBlockingDeque is full and if so:
>> >>> >> >
>> >>> >> > Java
>> >>> >> >     - flush the java pickling buffer
>> >>> >> >     - send a flush command to the python process
>> >>> >> >     - os.flush the java side
>> >>> >> >
>> >>> >> > Python
>> >>> >> >     - flush BatchedSerializer
>> >>> >> >     - os.flush()
>> >>> >> >
>> >>> >> > I haven't added this yet. This is getting very complex however.
>> >>> >> > Another
>> >>> >> > model would just be to change the protocol between the java side
>> >>> >> > and
>> >>> >> > the
>> >>> >> > worker to be a synchronous request/response. This has the
>> >>> >> > disadvantage
>> >>> >> > that
>> >>> >> > the CPU isn't doing anything when the batch is being sent across,
>> >>> >> > but it
>> >>> >> > has
>> >>> >> > the huge advantage of simplicity. In addition, I imagine that the
>> >>> >> > actual
>> >>> >> > IO
>> >>> >> > between the processes isn't that slow, but rather the
>> >>> >> > serialization
>> >>> >> > of
>> >>> >> > java
>> >>> >> > objects into pickled bytes, and the deserialization/serialization
>> >>> >> > +
>> >>> >> > python
>> >>> >> > loops on the python side. Another advantage is that we won't be
>> >>> >> > taking
>> >>> >> > more
>> >>> >> > than 100% CPU since only one thread is doing CPU work at a time
>> >>> >> > between
>> >>> >> > the
>> >>> >> > executor and the python interpreter.
>> >>> >> >
>> >>> >> > Any thoughts would be much appreciated =)
>> >>> >> >
>> >>> >> > Other improvements:
>> >>> >> >     - extract some code of the worker out of PythonRDD so that we
>> >>> >> > can do
>> >>> >> > a
>> >>> >> > mapPartitions directly in BatchedPythonEvaluation without
>> >>> >> > resorting
>> >>> >> > to
>> >>> >> > the
>> >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that
>> >>> >> > the
>> >>> >> > other
>> >>> >> > RDD can get a handle to the same iterator.
>> >>> >> >     - read elements and use a size estimator to create the
>> >>> >> > BlockingQueue
>> >>> >> > to
>> >>> >> > make sure that we don't store too many things in memory when
>> >>> >> > batching
>> >>> >> >     - patch Unpickler to not use StopException for control flow,
>> >>> >> > which
>> >>> >> > is
>> >>> >> > slowing down the java side
>> >>> >> >
>> >>> >> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Python UDF performance at large scale

Posted by Justin Uang <ju...@gmail.com>.
Correct, I was running with a batch size of about 100 when I did the tests,
because I was worried about deadlocks. Do you have any concerns regarding
the batched synchronous version of communication between the Java and
Python processes, and if not, should I file a ticket and starting writing
it?
On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <da...@databricks.com> wrote:

> From you comment, the 2x improvement only happens when you have the
> batch size as 1, right?
>
> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <ju...@gmail.com>
> wrote:
> > FYI, just submitted a PR to Pyrolite to remove their StopException.
> > https://github.com/irmen/Pyrolite/pull/30
> >
> > With my benchmark, removing it basically made it about 2x faster.
> >
> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <
> punya.biswal@gmail.com>
> > wrote:
> >>
> >> Hi Davies,
> >>
> >> In general, do we expect people to use CPython only for "heavyweight"
> UDFs
> >> that invoke an external library? Are there any examples of using Jython,
> >> especially performance comparisons to Java/Scala and CPython? When using
> >> Jython, do you expect the driver to send code to the executor as a
> string,
> >> or is there a good way to serialized Jython lambdas?
> >>
> >> (For context, I was unable to serialize Nashorn lambdas when I tried to
> >> use them in Spark.)
> >>
> >> Punya
> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <da...@databricks.com>
> wrote:
> >>>
> >>> Fare points, I also like simpler solutions.
> >>>
> >>> The overhead of Python task could be a few of milliseconds, which
> >>> means we also should eval them as batches (one Python task per batch).
> >>>
> >>> Decreasing the batch size for UDF sounds reasonable to me, together
> >>> with other tricks to reduce the data in socket/pipe buffer.
> >>>
> >>> BTW, what do your UDF looks like? How about to use Jython to run
> >>> simple Python UDF (without some external libraries).
> >>>
> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <ju...@gmail.com>
> >>> wrote:
> >>> > // + punya
> >>> >
> >>> > Thanks for your quick response!
> >>> >
> >>> > I'm not sure that using an unbounded buffer is a good solution to the
> >>> > locking problem. For example, in the situation where I had 500
> columns,
> >>> > I am
> >>> > in fact storing 499 extra columns on the java side, which might make
> me
> >>> > OOM
> >>> > if I have to store many rows. In addition, if I am using an
> >>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
> >>> > 65536
> >>> > rows before python starts outputting elements, in which case, the
> Java
> >>> > side
> >>> > has to buffer 65536 complete rows. In general it seems fragile to
> rely
> >>> > on
> >>> > blocking behavior in the Python coprocess. By contrast, it's very
> easy
> >>> > to
> >>> > verify the correctness and performance characteristics of the
> >>> > synchronous
> >>> > blocking solution.
> >>> >
> >>> >
> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com>
> >>> > wrote:
> >>> >>
> >>> >> Thanks for looking into it, I'd like the idea of having
> >>> >> ForkingIterator. If we have unlimited buffer in it, then will not
> have
> >>> >> the problem of deadlock, I think. The writing thread will be blocked
> >>> >> by Python process, so there will be not much rows be buffered(still
> be
> >>> >> a reason to OOM). At least, this approach is better than current
> one.
> >>> >>
> >>> >> Could you create a JIRA and sending out the PR?
> >>> >>
> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.uang@gmail.com
> >
> >>> >> wrote:
> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
> >>> >> > scale,
> >>> >> > but
> >>> >> > I have a proof-of-concept implementation that avoids caching the
> >>> >> > entire
> >>> >> > dataset.
> >>> >> >
> >>> >> > Hi,
> >>> >> >
> >>> >> > We have been running into performance problems using Python UDFs
> >>> >> > with
> >>> >> > DataFrames at large scale.
> >>> >> >
> >>> >> > From the implementation of BatchPythonEvaluation, it looks like
> the
> >>> >> > goal
> >>> >> > was
> >>> >> > to reuse the PythonRDD code. It caches the entire child RDD so
> that
> >>> >> > it
> >>> >> > can
> >>> >> > do two passes over the data. One to give to the PythonRDD, then
> one
> >>> >> > to
> >>> >> > join
> >>> >> > the python lambda results with the original row (which may have
> java
> >>> >> > objects
> >>> >> > that should be passed through).
> >>> >> >
> >>> >> > In addition, it caches all the columns, even the ones that don't
> >>> >> > need to
> >>> >> > be
> >>> >> > processed by the Python UDF. In the cases I was working with, I
> had
> >>> >> > a
> >>> >> > 500
> >>> >> > column table, and i wanted to use a python UDF for one column, and
> >>> >> > it
> >>> >> > ended
> >>> >> > up caching all 500 columns.
> >>> >> >
> >>> >> > I have a working solution over here that does it in one pass over
> >>> >> > the
> >>> >> > data,
> >>> >> > avoiding caching
> >>> >> >
> >>> >> >
> >>> >> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> >>> >> > With this patch, I go from a job that takes 20 minutes then OOMs,
> to
> >>> >> > a
> >>> >> > job
> >>> >> > that finishes completely in 3 minutes. It is indeed quite hacky
> and
> >>> >> > prone to
> >>> >> > deadlocks since there is buffering in many locations:
> >>> >> >
> >>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
> >>> >> >     - batching the rows before pickling them
> >>> >> >     - os buffers on both sides
> >>> >> >     - pyspark.serializers.BatchedSerializer
> >>> >> >
> >>> >> > We can avoid deadlock by being very disciplined. For example, we
> can
> >>> >> > have
> >>> >> > the ForkingIterator instead always do a check of whether the
> >>> >> > LinkedBlockingDeque is full and if so:
> >>> >> >
> >>> >> > Java
> >>> >> >     - flush the java pickling buffer
> >>> >> >     - send a flush command to the python process
> >>> >> >     - os.flush the java side
> >>> >> >
> >>> >> > Python
> >>> >> >     - flush BatchedSerializer
> >>> >> >     - os.flush()
> >>> >> >
> >>> >> > I haven't added this yet. This is getting very complex however.
> >>> >> > Another
> >>> >> > model would just be to change the protocol between the java side
> and
> >>> >> > the
> >>> >> > worker to be a synchronous request/response. This has the
> >>> >> > disadvantage
> >>> >> > that
> >>> >> > the CPU isn't doing anything when the batch is being sent across,
> >>> >> > but it
> >>> >> > has
> >>> >> > the huge advantage of simplicity. In addition, I imagine that the
> >>> >> > actual
> >>> >> > IO
> >>> >> > between the processes isn't that slow, but rather the
> serialization
> >>> >> > of
> >>> >> > java
> >>> >> > objects into pickled bytes, and the deserialization/serialization
> +
> >>> >> > python
> >>> >> > loops on the python side. Another advantage is that we won't be
> >>> >> > taking
> >>> >> > more
> >>> >> > than 100% CPU since only one thread is doing CPU work at a time
> >>> >> > between
> >>> >> > the
> >>> >> > executor and the python interpreter.
> >>> >> >
> >>> >> > Any thoughts would be much appreciated =)
> >>> >> >
> >>> >> > Other improvements:
> >>> >> >     - extract some code of the worker out of PythonRDD so that we
> >>> >> > can do
> >>> >> > a
> >>> >> > mapPartitions directly in BatchedPythonEvaluation without
> resorting
> >>> >> > to
> >>> >> > the
> >>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that
> >>> >> > the
> >>> >> > other
> >>> >> > RDD can get a handle to the same iterator.
> >>> >> >     - read elements and use a size estimator to create the
> >>> >> > BlockingQueue
> >>> >> > to
> >>> >> > make sure that we don't store too many things in memory when
> >>> >> > batching
> >>> >> >     - patch Unpickler to not use StopException for control flow,
> >>> >> > which
> >>> >> > is
> >>> >> > slowing down the java side
> >>> >> >
> >>> >> >
>

Re: Python UDF performance at large scale

Posted by Davies Liu <da...@databricks.com>.
>From you comment, the 2x improvement only happens when you have the
batch size as 1, right?

On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <ju...@gmail.com> wrote:
> FYI, just submitted a PR to Pyrolite to remove their StopException.
> https://github.com/irmen/Pyrolite/pull/30
>
> With my benchmark, removing it basically made it about 2x faster.
>
> On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <pu...@gmail.com>
> wrote:
>>
>> Hi Davies,
>>
>> In general, do we expect people to use CPython only for "heavyweight" UDFs
>> that invoke an external library? Are there any examples of using Jython,
>> especially performance comparisons to Java/Scala and CPython? When using
>> Jython, do you expect the driver to send code to the executor as a string,
>> or is there a good way to serialized Jython lambdas?
>>
>> (For context, I was unable to serialize Nashorn lambdas when I tried to
>> use them in Spark.)
>>
>> Punya
>> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <da...@databricks.com> wrote:
>>>
>>> Fare points, I also like simpler solutions.
>>>
>>> The overhead of Python task could be a few of milliseconds, which
>>> means we also should eval them as batches (one Python task per batch).
>>>
>>> Decreasing the batch size for UDF sounds reasonable to me, together
>>> with other tricks to reduce the data in socket/pipe buffer.
>>>
>>> BTW, what do your UDF looks like? How about to use Jython to run
>>> simple Python UDF (without some external libraries).
>>>
>>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <ju...@gmail.com>
>>> wrote:
>>> > // + punya
>>> >
>>> > Thanks for your quick response!
>>> >
>>> > I'm not sure that using an unbounded buffer is a good solution to the
>>> > locking problem. For example, in the situation where I had 500 columns,
>>> > I am
>>> > in fact storing 499 extra columns on the java side, which might make me
>>> > OOM
>>> > if I have to store many rows. In addition, if I am using an
>>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
>>> > 65536
>>> > rows before python starts outputting elements, in which case, the Java
>>> > side
>>> > has to buffer 65536 complete rows. In general it seems fragile to rely
>>> > on
>>> > blocking behavior in the Python coprocess. By contrast, it's very easy
>>> > to
>>> > verify the correctness and performance characteristics of the
>>> > synchronous
>>> > blocking solution.
>>> >
>>> >
>>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com>
>>> > wrote:
>>> >>
>>> >> Thanks for looking into it, I'd like the idea of having
>>> >> ForkingIterator. If we have unlimited buffer in it, then will not have
>>> >> the problem of deadlock, I think. The writing thread will be blocked
>>> >> by Python process, so there will be not much rows be buffered(still be
>>> >> a reason to OOM). At least, this approach is better than current one.
>>> >>
>>> >> Could you create a JIRA and sending out the PR?
>>> >>
>>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <ju...@gmail.com>
>>> >> wrote:
>>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
>>> >> > scale,
>>> >> > but
>>> >> > I have a proof-of-concept implementation that avoids caching the
>>> >> > entire
>>> >> > dataset.
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > We have been running into performance problems using Python UDFs
>>> >> > with
>>> >> > DataFrames at large scale.
>>> >> >
>>> >> > From the implementation of BatchPythonEvaluation, it looks like the
>>> >> > goal
>>> >> > was
>>> >> > to reuse the PythonRDD code. It caches the entire child RDD so that
>>> >> > it
>>> >> > can
>>> >> > do two passes over the data. One to give to the PythonRDD, then one
>>> >> > to
>>> >> > join
>>> >> > the python lambda results with the original row (which may have java
>>> >> > objects
>>> >> > that should be passed through).
>>> >> >
>>> >> > In addition, it caches all the columns, even the ones that don't
>>> >> > need to
>>> >> > be
>>> >> > processed by the Python UDF. In the cases I was working with, I had
>>> >> > a
>>> >> > 500
>>> >> > column table, and i wanted to use a python UDF for one column, and
>>> >> > it
>>> >> > ended
>>> >> > up caching all 500 columns.
>>> >> >
>>> >> > I have a working solution over here that does it in one pass over
>>> >> > the
>>> >> > data,
>>> >> > avoiding caching
>>> >> >
>>> >> >
>>> >> > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
>>> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to
>>> >> > a
>>> >> > job
>>> >> > that finishes completely in 3 minutes. It is indeed quite hacky and
>>> >> > prone to
>>> >> > deadlocks since there is buffering in many locations:
>>> >> >
>>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
>>> >> >     - batching the rows before pickling them
>>> >> >     - os buffers on both sides
>>> >> >     - pyspark.serializers.BatchedSerializer
>>> >> >
>>> >> > We can avoid deadlock by being very disciplined. For example, we can
>>> >> > have
>>> >> > the ForkingIterator instead always do a check of whether the
>>> >> > LinkedBlockingDeque is full and if so:
>>> >> >
>>> >> > Java
>>> >> >     - flush the java pickling buffer
>>> >> >     - send a flush command to the python process
>>> >> >     - os.flush the java side
>>> >> >
>>> >> > Python
>>> >> >     - flush BatchedSerializer
>>> >> >     - os.flush()
>>> >> >
>>> >> > I haven't added this yet. This is getting very complex however.
>>> >> > Another
>>> >> > model would just be to change the protocol between the java side and
>>> >> > the
>>> >> > worker to be a synchronous request/response. This has the
>>> >> > disadvantage
>>> >> > that
>>> >> > the CPU isn't doing anything when the batch is being sent across,
>>> >> > but it
>>> >> > has
>>> >> > the huge advantage of simplicity. In addition, I imagine that the
>>> >> > actual
>>> >> > IO
>>> >> > between the processes isn't that slow, but rather the serialization
>>> >> > of
>>> >> > java
>>> >> > objects into pickled bytes, and the deserialization/serialization +
>>> >> > python
>>> >> > loops on the python side. Another advantage is that we won't be
>>> >> > taking
>>> >> > more
>>> >> > than 100% CPU since only one thread is doing CPU work at a time
>>> >> > between
>>> >> > the
>>> >> > executor and the python interpreter.
>>> >> >
>>> >> > Any thoughts would be much appreciated =)
>>> >> >
>>> >> > Other improvements:
>>> >> >     - extract some code of the worker out of PythonRDD so that we
>>> >> > can do
>>> >> > a
>>> >> > mapPartitions directly in BatchedPythonEvaluation without resorting
>>> >> > to
>>> >> > the
>>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that
>>> >> > the
>>> >> > other
>>> >> > RDD can get a handle to the same iterator.
>>> >> >     - read elements and use a size estimator to create the
>>> >> > BlockingQueue
>>> >> > to
>>> >> > make sure that we don't store too many things in memory when
>>> >> > batching
>>> >> >     - patch Unpickler to not use StopException for control flow,
>>> >> > which
>>> >> > is
>>> >> > slowing down the java side
>>> >> >
>>> >> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Python UDF performance at large scale

Posted by Justin Uang <ju...@gmail.com>.
FYI, just submitted a PR to Pyrolite to remove their StopException.
https://github.com/irmen/Pyrolite/pull/30

With my benchmark, removing it basically made it about 2x faster.

On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <pu...@gmail.com>
wrote:

> Hi Davies,
>
> In general, do we expect people to use CPython only for "heavyweight" UDFs
> that invoke an external library? Are there any examples of using Jython,
> especially performance comparisons to Java/Scala and CPython? When using
> Jython, do you expect the driver to send code to the executor as a string,
> or is there a good way to serialized Jython lambdas?
>
> (For context, I was unable to serialize Nashorn lambdas when I tried to
> use them in Spark.)
>
> Punya
> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <da...@databricks.com> wrote:
>
>> Fare points, I also like simpler solutions.
>>
>> The overhead of Python task could be a few of milliseconds, which
>> means we also should eval them as batches (one Python task per batch).
>>
>> Decreasing the batch size for UDF sounds reasonable to me, together
>> with other tricks to reduce the data in socket/pipe buffer.
>>
>> BTW, what do your UDF looks like? How about to use Jython to run
>> simple Python UDF (without some external libraries).
>>
>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <ju...@gmail.com>
>> wrote:
>> > // + punya
>> >
>> > Thanks for your quick response!
>> >
>> > I'm not sure that using an unbounded buffer is a good solution to the
>> > locking problem. For example, in the situation where I had 500 columns,
>> I am
>> > in fact storing 499 extra columns on the java side, which might make me
>> OOM
>> > if I have to store many rows. In addition, if I am using an
>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
>> 65536
>> > rows before python starts outputting elements, in which case, the Java
>> side
>> > has to buffer 65536 complete rows. In general it seems fragile to rely
>> on
>> > blocking behavior in the Python coprocess. By contrast, it's very easy
>> to
>> > verify the correctness and performance characteristics of the
>> synchronous
>> > blocking solution.
>> >
>> >
>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com>
>> wrote:
>> >>
>> >> Thanks for looking into it, I'd like the idea of having
>> >> ForkingIterator. If we have unlimited buffer in it, then will not have
>> >> the problem of deadlock, I think. The writing thread will be blocked
>> >> by Python process, so there will be not much rows be buffered(still be
>> >> a reason to OOM). At least, this approach is better than current one.
>> >>
>> >> Could you create a JIRA and sending out the PR?
>> >>
>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <ju...@gmail.com>
>> >> wrote:
>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
>> scale,
>> >> > but
>> >> > I have a proof-of-concept implementation that avoids caching the
>> entire
>> >> > dataset.
>> >> >
>> >> > Hi,
>> >> >
>> >> > We have been running into performance problems using Python UDFs with
>> >> > DataFrames at large scale.
>> >> >
>> >> > From the implementation of BatchPythonEvaluation, it looks like the
>> goal
>> >> > was
>> >> > to reuse the PythonRDD code. It caches the entire child RDD so that
>> it
>> >> > can
>> >> > do two passes over the data. One to give to the PythonRDD, then one
>> to
>> >> > join
>> >> > the python lambda results with the original row (which may have java
>> >> > objects
>> >> > that should be passed through).
>> >> >
>> >> > In addition, it caches all the columns, even the ones that don't
>> need to
>> >> > be
>> >> > processed by the Python UDF. In the cases I was working with, I had a
>> >> > 500
>> >> > column table, and i wanted to use a python UDF for one column, and it
>> >> > ended
>> >> > up caching all 500 columns.
>> >> >
>> >> > I have a working solution over here that does it in one pass over the
>> >> > data,
>> >> > avoiding caching
>> >> >
>> >> > (
>> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
>> ).
>> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to
>> a
>> >> > job
>> >> > that finishes completely in 3 minutes. It is indeed quite hacky and
>> >> > prone to
>> >> > deadlocks since there is buffering in many locations:
>> >> >
>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
>> >> >     - batching the rows before pickling them
>> >> >     - os buffers on both sides
>> >> >     - pyspark.serializers.BatchedSerializer
>> >> >
>> >> > We can avoid deadlock by being very disciplined. For example, we can
>> >> > have
>> >> > the ForkingIterator instead always do a check of whether the
>> >> > LinkedBlockingDeque is full and if so:
>> >> >
>> >> > Java
>> >> >     - flush the java pickling buffer
>> >> >     - send a flush command to the python process
>> >> >     - os.flush the java side
>> >> >
>> >> > Python
>> >> >     - flush BatchedSerializer
>> >> >     - os.flush()
>> >> >
>> >> > I haven't added this yet. This is getting very complex however.
>> Another
>> >> > model would just be to change the protocol between the java side and
>> the
>> >> > worker to be a synchronous request/response. This has the
>> disadvantage
>> >> > that
>> >> > the CPU isn't doing anything when the batch is being sent across,
>> but it
>> >> > has
>> >> > the huge advantage of simplicity. In addition, I imagine that the
>> actual
>> >> > IO
>> >> > between the processes isn't that slow, but rather the serialization
>> of
>> >> > java
>> >> > objects into pickled bytes, and the deserialization/serialization +
>> >> > python
>> >> > loops on the python side. Another advantage is that we won't be
>> taking
>> >> > more
>> >> > than 100% CPU since only one thread is doing CPU work at a time
>> between
>> >> > the
>> >> > executor and the python interpreter.
>> >> >
>> >> > Any thoughts would be much appreciated =)
>> >> >
>> >> > Other improvements:
>> >> >     - extract some code of the worker out of PythonRDD so that we
>> can do
>> >> > a
>> >> > mapPartitions directly in BatchedPythonEvaluation without resorting
>> to
>> >> > the
>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
>> >> > other
>> >> > RDD can get a handle to the same iterator.
>> >> >     - read elements and use a size estimator to create the
>> BlockingQueue
>> >> > to
>> >> > make sure that we don't store too many things in memory when batching
>> >> >     - patch Unpickler to not use StopException for control flow,
>> which
>> >> > is
>> >> > slowing down the java side
>> >> >
>> >> >
>>
>

Re: Python UDF performance at large scale

Posted by Punyashloka Biswal <pu...@gmail.com>.
Hi Davies,

In general, do we expect people to use CPython only for "heavyweight" UDFs
that invoke an external library? Are there any examples of using Jython,
especially performance comparisons to Java/Scala and CPython? When using
Jython, do you expect the driver to send code to the executor as a string,
or is there a good way to serialized Jython lambdas?

(For context, I was unable to serialize Nashorn lambdas when I tried to use
them in Spark.)

Punya
On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <da...@databricks.com> wrote:

> Fare points, I also like simpler solutions.
>
> The overhead of Python task could be a few of milliseconds, which
> means we also should eval them as batches (one Python task per batch).
>
> Decreasing the batch size for UDF sounds reasonable to me, together
> with other tricks to reduce the data in socket/pipe buffer.
>
> BTW, what do your UDF looks like? How about to use Jython to run
> simple Python UDF (without some external libraries).
>
> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <ju...@gmail.com>
> wrote:
> > // + punya
> >
> > Thanks for your quick response!
> >
> > I'm not sure that using an unbounded buffer is a good solution to the
> > locking problem. For example, in the situation where I had 500 columns,
> I am
> > in fact storing 499 extra columns on the java side, which might make me
> OOM
> > if I have to store many rows. In addition, if I am using an
> > AutoBatchedSerializer, the java side might have to write 1 << 16 == 65536
> > rows before python starts outputting elements, in which case, the Java
> side
> > has to buffer 65536 complete rows. In general it seems fragile to rely on
> > blocking behavior in the Python coprocess. By contrast, it's very easy to
> > verify the correctness and performance characteristics of the synchronous
> > blocking solution.
> >
> >
> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com>
> wrote:
> >>
> >> Thanks for looking into it, I'd like the idea of having
> >> ForkingIterator. If we have unlimited buffer in it, then will not have
> >> the problem of deadlock, I think. The writing thread will be blocked
> >> by Python process, so there will be not much rows be buffered(still be
> >> a reason to OOM). At least, this approach is better than current one.
> >>
> >> Could you create a JIRA and sending out the PR?
> >>
> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <ju...@gmail.com>
> >> wrote:
> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
> scale,
> >> > but
> >> > I have a proof-of-concept implementation that avoids caching the
> entire
> >> > dataset.
> >> >
> >> > Hi,
> >> >
> >> > We have been running into performance problems using Python UDFs with
> >> > DataFrames at large scale.
> >> >
> >> > From the implementation of BatchPythonEvaluation, it looks like the
> goal
> >> > was
> >> > to reuse the PythonRDD code. It caches the entire child RDD so that it
> >> > can
> >> > do two passes over the data. One to give to the PythonRDD, then one to
> >> > join
> >> > the python lambda results with the original row (which may have java
> >> > objects
> >> > that should be passed through).
> >> >
> >> > In addition, it caches all the columns, even the ones that don't need
> to
> >> > be
> >> > processed by the Python UDF. In the cases I was working with, I had a
> >> > 500
> >> > column table, and i wanted to use a python UDF for one column, and it
> >> > ended
> >> > up caching all 500 columns.
> >> >
> >> > I have a working solution over here that does it in one pass over the
> >> > data,
> >> > avoiding caching
> >> >
> >> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to a
> >> > job
> >> > that finishes completely in 3 minutes. It is indeed quite hacky and
> >> > prone to
> >> > deadlocks since there is buffering in many locations:
> >> >
> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
> >> >     - batching the rows before pickling them
> >> >     - os buffers on both sides
> >> >     - pyspark.serializers.BatchedSerializer
> >> >
> >> > We can avoid deadlock by being very disciplined. For example, we can
> >> > have
> >> > the ForkingIterator instead always do a check of whether the
> >> > LinkedBlockingDeque is full and if so:
> >> >
> >> > Java
> >> >     - flush the java pickling buffer
> >> >     - send a flush command to the python process
> >> >     - os.flush the java side
> >> >
> >> > Python
> >> >     - flush BatchedSerializer
> >> >     - os.flush()
> >> >
> >> > I haven't added this yet. This is getting very complex however.
> Another
> >> > model would just be to change the protocol between the java side and
> the
> >> > worker to be a synchronous request/response. This has the disadvantage
> >> > that
> >> > the CPU isn't doing anything when the batch is being sent across, but
> it
> >> > has
> >> > the huge advantage of simplicity. In addition, I imagine that the
> actual
> >> > IO
> >> > between the processes isn't that slow, but rather the serialization of
> >> > java
> >> > objects into pickled bytes, and the deserialization/serialization +
> >> > python
> >> > loops on the python side. Another advantage is that we won't be taking
> >> > more
> >> > than 100% CPU since only one thread is doing CPU work at a time
> between
> >> > the
> >> > executor and the python interpreter.
> >> >
> >> > Any thoughts would be much appreciated =)
> >> >
> >> > Other improvements:
> >> >     - extract some code of the worker out of PythonRDD so that we can
> do
> >> > a
> >> > mapPartitions directly in BatchedPythonEvaluation without resorting to
> >> > the
> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
> >> > other
> >> > RDD can get a handle to the same iterator.
> >> >     - read elements and use a size estimator to create the
> BlockingQueue
> >> > to
> >> > make sure that we don't store too many things in memory when batching
> >> >     - patch Unpickler to not use StopException for control flow, which
> >> > is
> >> > slowing down the java side
> >> >
> >> >
>

Re: Python UDF performance at large scale

Posted by Davies Liu <da...@databricks.com>.
Fare points, I also like simpler solutions.

The overhead of Python task could be a few of milliseconds, which
means we also should eval them as batches (one Python task per batch).

Decreasing the batch size for UDF sounds reasonable to me, together
with other tricks to reduce the data in socket/pipe buffer.

BTW, what do your UDF looks like? How about to use Jython to run
simple Python UDF (without some external libraries).

On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <ju...@gmail.com> wrote:
> // + punya
>
> Thanks for your quick response!
>
> I'm not sure that using an unbounded buffer is a good solution to the
> locking problem. For example, in the situation where I had 500 columns, I am
> in fact storing 499 extra columns on the java side, which might make me OOM
> if I have to store many rows. In addition, if I am using an
> AutoBatchedSerializer, the java side might have to write 1 << 16 == 65536
> rows before python starts outputting elements, in which case, the Java side
> has to buffer 65536 complete rows. In general it seems fragile to rely on
> blocking behavior in the Python coprocess. By contrast, it's very easy to
> verify the correctness and performance characteristics of the synchronous
> blocking solution.
>
>
> On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com> wrote:
>>
>> Thanks for looking into it, I'd like the idea of having
>> ForkingIterator. If we have unlimited buffer in it, then will not have
>> the problem of deadlock, I think. The writing thread will be blocked
>> by Python process, so there will be not much rows be buffered(still be
>> a reason to OOM). At least, this approach is better than current one.
>>
>> Could you create a JIRA and sending out the PR?
>>
>> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <ju...@gmail.com>
>> wrote:
>> > BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
>> > but
>> > I have a proof-of-concept implementation that avoids caching the entire
>> > dataset.
>> >
>> > Hi,
>> >
>> > We have been running into performance problems using Python UDFs with
>> > DataFrames at large scale.
>> >
>> > From the implementation of BatchPythonEvaluation, it looks like the goal
>> > was
>> > to reuse the PythonRDD code. It caches the entire child RDD so that it
>> > can
>> > do two passes over the data. One to give to the PythonRDD, then one to
>> > join
>> > the python lambda results with the original row (which may have java
>> > objects
>> > that should be passed through).
>> >
>> > In addition, it caches all the columns, even the ones that don't need to
>> > be
>> > processed by the Python UDF. In the cases I was working with, I had a
>> > 500
>> > column table, and i wanted to use a python UDF for one column, and it
>> > ended
>> > up caching all 500 columns.
>> >
>> > I have a working solution over here that does it in one pass over the
>> > data,
>> > avoiding caching
>> >
>> > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
>> > With this patch, I go from a job that takes 20 minutes then OOMs, to a
>> > job
>> > that finishes completely in 3 minutes. It is indeed quite hacky and
>> > prone to
>> > deadlocks since there is buffering in many locations:
>> >
>> >     - NEW: the ForkingIterator LinkedBlockingDeque
>> >     - batching the rows before pickling them
>> >     - os buffers on both sides
>> >     - pyspark.serializers.BatchedSerializer
>> >
>> > We can avoid deadlock by being very disciplined. For example, we can
>> > have
>> > the ForkingIterator instead always do a check of whether the
>> > LinkedBlockingDeque is full and if so:
>> >
>> > Java
>> >     - flush the java pickling buffer
>> >     - send a flush command to the python process
>> >     - os.flush the java side
>> >
>> > Python
>> >     - flush BatchedSerializer
>> >     - os.flush()
>> >
>> > I haven't added this yet. This is getting very complex however. Another
>> > model would just be to change the protocol between the java side and the
>> > worker to be a synchronous request/response. This has the disadvantage
>> > that
>> > the CPU isn't doing anything when the batch is being sent across, but it
>> > has
>> > the huge advantage of simplicity. In addition, I imagine that the actual
>> > IO
>> > between the processes isn't that slow, but rather the serialization of
>> > java
>> > objects into pickled bytes, and the deserialization/serialization +
>> > python
>> > loops on the python side. Another advantage is that we won't be taking
>> > more
>> > than 100% CPU since only one thread is doing CPU work at a time between
>> > the
>> > executor and the python interpreter.
>> >
>> > Any thoughts would be much appreciated =)
>> >
>> > Other improvements:
>> >     - extract some code of the worker out of PythonRDD so that we can do
>> > a
>> > mapPartitions directly in BatchedPythonEvaluation without resorting to
>> > the
>> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
>> > other
>> > RDD can get a handle to the same iterator.
>> >     - read elements and use a size estimator to create the BlockingQueue
>> > to
>> > make sure that we don't store too many things in memory when batching
>> >     - patch Unpickler to not use StopException for control flow, which
>> > is
>> > slowing down the java side
>> >
>> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Python UDF performance at large scale

Posted by Justin Uang <ju...@gmail.com>.
// + punya

Thanks for your quick response!

I'm not sure that using an unbounded buffer is a good solution to the
locking problem. For example, in the situation where I had 500 columns, I
am in fact storing 499 extra columns on the java side, which might make me
OOM if I have to store many rows. In addition, if I am using an
AutoBatchedSerializer, the java side might have to write 1 << 16 == 65536
rows before python starts outputting elements, in which case, the Java side
has to buffer 65536 complete rows. In general it seems fragile to rely on
blocking behavior in the Python coprocess. By contrast, it's very easy to
verify the correctness and performance characteristics of the synchronous
blocking solution.


On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <da...@databricks.com> wrote:

> Thanks for looking into it, I'd like the idea of having
> ForkingIterator. If we have unlimited buffer in it, then will not have
> the problem of deadlock, I think. The writing thread will be blocked
> by Python process, so there will be not much rows be buffered(still be
> a reason to OOM). At least, this approach is better than current one.
>
> Could you create a JIRA and sending out the PR?
>
> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <ju...@gmail.com>
> wrote:
> > BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
> but
> > I have a proof-of-concept implementation that avoids caching the entire
> > dataset.
> >
> > Hi,
> >
> > We have been running into performance problems using Python UDFs with
> > DataFrames at large scale.
> >
> > From the implementation of BatchPythonEvaluation, it looks like the goal
> was
> > to reuse the PythonRDD code. It caches the entire child RDD so that it
> can
> > do two passes over the data. One to give to the PythonRDD, then one to
> join
> > the python lambda results with the original row (which may have java
> objects
> > that should be passed through).
> >
> > In addition, it caches all the columns, even the ones that don't need to
> be
> > processed by the Python UDF. In the cases I was working with, I had a 500
> > column table, and i wanted to use a python UDF for one column, and it
> ended
> > up caching all 500 columns.
> >
> > I have a working solution over here that does it in one pass over the
> data,
> > avoiding caching
> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> > With this patch, I go from a job that takes 20 minutes then OOMs, to a
> job
> > that finishes completely in 3 minutes. It is indeed quite hacky and
> prone to
> > deadlocks since there is buffering in many locations:
> >
> >     - NEW: the ForkingIterator LinkedBlockingDeque
> >     - batching the rows before pickling them
> >     - os buffers on both sides
> >     - pyspark.serializers.BatchedSerializer
> >
> > We can avoid deadlock by being very disciplined. For example, we can have
> > the ForkingIterator instead always do a check of whether the
> > LinkedBlockingDeque is full and if so:
> >
> > Java
> >     - flush the java pickling buffer
> >     - send a flush command to the python process
> >     - os.flush the java side
> >
> > Python
> >     - flush BatchedSerializer
> >     - os.flush()
> >
> > I haven't added this yet. This is getting very complex however. Another
> > model would just be to change the protocol between the java side and the
> > worker to be a synchronous request/response. This has the disadvantage
> that
> > the CPU isn't doing anything when the batch is being sent across, but it
> has
> > the huge advantage of simplicity. In addition, I imagine that the actual
> IO
> > between the processes isn't that slow, but rather the serialization of
> java
> > objects into pickled bytes, and the deserialization/serialization +
> python
> > loops on the python side. Another advantage is that we won't be taking
> more
> > than 100% CPU since only one thread is doing CPU work at a time between
> the
> > executor and the python interpreter.
> >
> > Any thoughts would be much appreciated =)
> >
> > Other improvements:
> >     - extract some code of the worker out of PythonRDD so that we can do
> a
> > mapPartitions directly in BatchedPythonEvaluation without resorting to
> the
> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
> other
> > RDD can get a handle to the same iterator.
> >     - read elements and use a size estimator to create the BlockingQueue
> to
> > make sure that we don't store too many things in memory when batching
> >     - patch Unpickler to not use StopException for control flow, which is
> > slowing down the java side
> >
> >
>

Re: Python UDF performance at large scale

Posted by Davies Liu <da...@databricks.com>.
Thanks for looking into it, I'd like the idea of having
ForkingIterator. If we have unlimited buffer in it, then will not have
the problem of deadlock, I think. The writing thread will be blocked
by Python process, so there will be not much rows be buffered(still be
a reason to OOM). At least, this approach is better than current one.

Could you create a JIRA and sending out the PR?

On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <ju...@gmail.com> wrote:
> BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but
> I have a proof-of-concept implementation that avoids caching the entire
> dataset.
>
> Hi,
>
> We have been running into performance problems using Python UDFs with
> DataFrames at large scale.
>
> From the implementation of BatchPythonEvaluation, it looks like the goal was
> to reuse the PythonRDD code. It caches the entire child RDD so that it can
> do two passes over the data. One to give to the PythonRDD, then one to join
> the python lambda results with the original row (which may have java objects
> that should be passed through).
>
> In addition, it caches all the columns, even the ones that don't need to be
> processed by the Python UDF. In the cases I was working with, I had a 500
> column table, and i wanted to use a python UDF for one column, and it ended
> up caching all 500 columns.
>
> I have a working solution over here that does it in one pass over the data,
> avoiding caching
> (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
> With this patch, I go from a job that takes 20 minutes then OOMs, to a job
> that finishes completely in 3 minutes. It is indeed quite hacky and prone to
> deadlocks since there is buffering in many locations:
>
>     - NEW: the ForkingIterator LinkedBlockingDeque
>     - batching the rows before pickling them
>     - os buffers on both sides
>     - pyspark.serializers.BatchedSerializer
>
> We can avoid deadlock by being very disciplined. For example, we can have
> the ForkingIterator instead always do a check of whether the
> LinkedBlockingDeque is full and if so:
>
> Java
>     - flush the java pickling buffer
>     - send a flush command to the python process
>     - os.flush the java side
>
> Python
>     - flush BatchedSerializer
>     - os.flush()
>
> I haven't added this yet. This is getting very complex however. Another
> model would just be to change the protocol between the java side and the
> worker to be a synchronous request/response. This has the disadvantage that
> the CPU isn't doing anything when the batch is being sent across, but it has
> the huge advantage of simplicity. In addition, I imagine that the actual IO
> between the processes isn't that slow, but rather the serialization of java
> objects into pickled bytes, and the deserialization/serialization + python
> loops on the python side. Another advantage is that we won't be taking more
> than 100% CPU since only one thread is doing CPU work at a time between the
> executor and the python interpreter.
>
> Any thoughts would be much appreciated =)
>
> Other improvements:
>     - extract some code of the worker out of PythonRDD so that we can do a
> mapPartitions directly in BatchedPythonEvaluation without resorting to the
> hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
> RDD can get a handle to the same iterator.
>     - read elements and use a size estimator to create the BlockingQueue to
> make sure that we don't store too many things in memory when batching
>     - patch Unpickler to not use StopException for control flow, which is
> slowing down the java side
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org