You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Abdeali Kothari <ab...@gmail.com> on 2019/07/17 04:18:59 UTC

Usage of PyArrow in Spark

Hi,
In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
spark. And I was trying to understand the benefit in terms of serialization
/ deserializaiton it provides.

I understand that the new pandas-udf works only if pyarrow is installed.
But what about the plain old PythonUDF which can be used in map() kind of
operations?
Are they also using pyarrow under the hood to reduce the cost is serde? Or
do they remain as earlier and no performance gain should be expected in
those?

If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow as
the data transfer cost to serialize/deserialzie from Java to Python and
back still exists and could potentially be reduced by using Arrow?
Is my understanding correct? Are there any plans to implement this?

Pointers to any notes or Jira about this would be appreciated.

Re: Usage of PyArrow in Spark

Posted by Bryan Cutler <cu...@gmail.com>.
It would be possible to use arrow on regular python udfs and avoid pandas,
and there would probably be some performance improvement. The difficult
part will be to ensure that the data remains consistent in the conversions
between Arrow and Python, e.g. timestamps are a bit tricky.  Given that we
already have pandas_udfs, I'm not sure if it would be worth the effort but
it might be a good experiment to see how much improvement it would bring.

Bryan

On Thu, Jul 18, 2019 at 12:02 AM Abdeali Kothari <ab...@gmail.com>
wrote:

> I was thinking of implementing that. But quickly realized that doing a
> conversion of Spark -> Pandas -> Python causes errors.
>
> A quick example being "None" in Numeric data types.
> Pandas supports only NaN. Spark supports NULL and NaN.
>
> This is just one of the issues I came to.
> I'm not sure about some of the more complex types like Array, Map, struct
> which are internally converted to pd.Series with type being object.
>
> I think that avoiding pandas in between and doing something from Arrow to
> Python would be more efficient as, if I understand right, Arrow has a wider
> range of types and can handle this better.
>
> >>> from pyspark.sql import functions as F
> >>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val'])
>
> # Return the column with no change
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col)
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> +----+-------------+
> | val|<lambda>(val)|
> +----+-------------+
> |null|         null|
> | NaN|         null|
> | 1.1|          1.1|
> +----+-------------+
>
> # isnull()
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
> col.isnull())
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> +----+-------------+
> | val|<lambda>(val)|
> +----+-------------+
> |null|          1.0|
> | NaN|          1.0|
> | 1.1|          0.0|
> +----+-------------+
>
> # Check for "is None"
> >>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
> col.apply(lambda x: x is None))
> >>> sdf.select(sdf['val'], udf(sdf['val'])).show()
> +----+-------------+
> | val|<lambda>(val)|
> +----+-------------+
> |null|          0.0|
> | NaN|          0.0|
> | 1.1|          0.0|
> +----+-------------+
>
> On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon <gu...@gmail.com> wrote:
>
>> Regular Python UDFs don't use PyArrow under the hood.
>> Yes, they can potentially benefit but they can be easily worked around
>> via Pandas UDFs.
>>
>> For instance, both below are virtually identical.
>>
>> @udf(...)
>> def func(col):
>>     return col
>>
>> @pandas_udf(...)
>> def pandas_func(col):
>>     return a.apply(lambda col: col)
>>
>> If we only need some minimised change, I would be positive about adding
>> Arrow support into regular Python UDFs. Otherwise, I am not sure yet.
>>
>>
>> 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari <ab...@gmail.com>님이
>> 작성:
>>
>>> Hi,
>>> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
>>> spark. And I was trying to understand the benefit in terms of serialization
>>> / deserializaiton it provides.
>>>
>>> I understand that the new pandas-udf works only if pyarrow is installed.
>>> But what about the plain old PythonUDF which can be used in map() kind
>>> of operations?
>>> Are they also using pyarrow under the hood to reduce the cost is serde?
>>> Or do they remain as earlier and no performance gain should be expected in
>>> those?
>>>
>>> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow
>>> as the data transfer cost to serialize/deserialzie from Java to Python and
>>> back still exists and could potentially be reduced by using Arrow?
>>> Is my understanding correct? Are there any plans to implement this?
>>>
>>> Pointers to any notes or Jira about this would be appreciated.
>>>
>>

Re: Usage of PyArrow in Spark

Posted by Abdeali Kothari <ab...@gmail.com>.
I was thinking of implementing that. But quickly realized that doing a
conversion of Spark -> Pandas -> Python causes errors.

A quick example being "None" in Numeric data types.
Pandas supports only NaN. Spark supports NULL and NaN.

This is just one of the issues I came to.
I'm not sure about some of the more complex types like Array, Map, struct
which are internally converted to pd.Series with type being object.

I think that avoiding pandas in between and doing something from Arrow to
Python would be more efficient as, if I understand right, Arrow has a wider
range of types and can handle this better.

>>> from pyspark.sql import functions as F
>>> sdf = spark.createDataFrame([ [None], [float('nan')], [1.1] ], ['val'])

# Return the column with no change
>>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col: col)
>>> sdf.select(sdf['val'], udf(sdf['val'])).show()
+----+-------------+
| val|<lambda>(val)|
+----+-------------+
|null|         null|
| NaN|         null|
| 1.1|          1.1|
+----+-------------+

# isnull()
>>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
col.isnull())
>>> sdf.select(sdf['val'], udf(sdf['val'])).show()
+----+-------------+
| val|<lambda>(val)|
+----+-------------+
|null|          1.0|
| NaN|          1.0|
| 1.1|          0.0|
+----+-------------+

# Check for "is None"
>>> udf = F.pandas_udf('double', F.PandasUDFType.SCALAR)(lambda col:
col.apply(lambda x: x is None))
>>> sdf.select(sdf['val'], udf(sdf['val'])).show()
+----+-------------+
| val|<lambda>(val)|
+----+-------------+
|null|          0.0|
| NaN|          0.0|
| 1.1|          0.0|
+----+-------------+

On Wed, Jul 17, 2019 at 4:47 PM Hyukjin Kwon <gu...@gmail.com> wrote:

> Regular Python UDFs don't use PyArrow under the hood.
> Yes, they can potentially benefit but they can be easily worked around via
> Pandas UDFs.
>
> For instance, both below are virtually identical.
>
> @udf(...)
> def func(col):
>     return col
>
> @pandas_udf(...)
> def pandas_func(col):
>     return a.apply(lambda col: col)
>
> If we only need some minimised change, I would be positive about adding
> Arrow support into regular Python UDFs. Otherwise, I am not sure yet.
>
>
> 2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari <ab...@gmail.com>님이 작성:
>
>> Hi,
>> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
>> spark. And I was trying to understand the benefit in terms of serialization
>> / deserializaiton it provides.
>>
>> I understand that the new pandas-udf works only if pyarrow is installed.
>> But what about the plain old PythonUDF which can be used in map() kind of
>> operations?
>> Are they also using pyarrow under the hood to reduce the cost is serde?
>> Or do they remain as earlier and no performance gain should be expected in
>> those?
>>
>> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow
>> as the data transfer cost to serialize/deserialzie from Java to Python and
>> back still exists and could potentially be reduced by using Arrow?
>> Is my understanding correct? Are there any plans to implement this?
>>
>> Pointers to any notes or Jira about this would be appreciated.
>>
>

Re: Usage of PyArrow in Spark

Posted by Hyukjin Kwon <gu...@gmail.com>.
Regular Python UDFs don't use PyArrow under the hood.
Yes, they can potentially benefit but they can be easily worked around via
Pandas UDFs.

For instance, both below are virtually identical.

@udf(...)
def func(col):
    return col

@pandas_udf(...)
def pandas_func(col):
    return a.apply(lambda col: col)

If we only need some minimised change, I would be positive about adding
Arrow support into regular Python UDFs. Otherwise, I am not sure yet.


2019년 7월 17일 (수) 오후 1:19, Abdeali Kothari <ab...@gmail.com>님이 작성:

> Hi,
> In spark 2.3+ I saw that pyarrow was being used in a bunch of places in
> spark. And I was trying to understand the benefit in terms of serialization
> / deserializaiton it provides.
>
> I understand that the new pandas-udf works only if pyarrow is installed.
> But what about the plain old PythonUDF which can be used in map() kind of
> operations?
> Are they also using pyarrow under the hood to reduce the cost is serde? Or
> do they remain as earlier and no performance gain should be expected in
> those?
>
> If I'm not mistaken, plain old PythonUDFs could also benefit from Arrow as
> the data transfer cost to serialize/deserialzie from Java to Python and
> back still exists and could potentially be reduced by using Arrow?
> Is my understanding correct? Are there any plans to implement this?
>
> Pointers to any notes or Jira about this would be appreciated.
>