You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Stanislav Stolpovskiy (JIRA)" <ji...@apache.org> on 2019/02/25 18:22:00 UTC
[jira] [Commented] (ARROW-2590) [Python] Pyspark python_udf
serialization error on grouped map (Amazon EMR)
[ https://issues.apache.org/jira/browse/ARROW-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777173#comment-16777173 ]
Stanislav Stolpovskiy commented on ARROW-2590:
----------------------------------------------
Ross, thank you for your comment!
I have the same error:
{code:java}
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
batch = _create_batch(series, self._timezone)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 246, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 246, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 244, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
File "pyarrow/array.pxi", line 531, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 171, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 81, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: 'utf-32-le' codec can't decode bytes in position 0-3: code point not in range(0x110000)```
{code}
After rechecking all fields in returned output dataframe I have found that field type is not equal with schema provided in UDF method.
After changing type issue gone.
> [Python] Pyspark python_udf serialization error on grouped map (Amazon EMR)
> ---------------------------------------------------------------------------
>
> Key: ARROW-2590
> URL: https://issues.apache.org/jira/browse/ARROW-2590
> Project: Apache Arrow
> Issue Type: Bug
> Components: Python
> Affects Versions: 0.9.0
> Environment: Amazon EMR 5.13
> Spark 2.3.0
> PyArrow 0.9.0 (and 0.8.0)
> Pandas 0.22.0 (and 0.21.1)
> Numpy 1.14.1
> Reporter: Daniel Fithian
> Priority: Critical
>
> I am writing a python_udf grouped map aggregation on Spark 2.3.0 in Amazon EMR. When I try to run any aggregation, I get the following Python stack trace:
> {quote}{{18/05/16 14:08:56 ERROR Utils: Aborting task}}
> {{ org.apache.spark.api.python.PythonException: Traceback (most recent call last):}}
> {{ \{{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py", line 229, in m}}}}
> {{ ain}}
> {{ \{{ process()}}}}
> {{ \{{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/worker.py", line 224, in p}}}}
> {{ rocess}}
> {{ \{{ serializer.dump_stream(func(split_index, iterator), outfile)}}}}
> {{ \{{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 261,}}}}
> {{ \{{ in dump_stream}}}}
> {{ \{{ batch = _create_batch(series, self._timezone)}}}}
> {{ \{{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 239,}}}}
> {{ \{{ in _create_batch}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 239,}}}}
> {{ \{{ in <listcomp>}}}}
> {{ {{ arrs = [create_array(s, t) for s, t in series]}}}}
> {{ \{{ File "/mnt/yarn/usercache/hadoop/appcache/application_1526400761989_0068/container_1526400761989_0068_01_000002/pyspark.zip/pyspark/serializers.py", line 237, in create_array}}}}
> {{ \{{ return pa.Array.from_pandas(s, mask=mask, type=t)}}}}
> {{ \{{ File "array.pxi", line 372, in pyarrow.lib.Array.from_pandas}}}}
> {{ \{{ File "array.pxi", line 177, in pyarrow.lib.array}}}}
> {{ \{{ File "array.pxi", line 77, in pyarrow.lib._ndarray_to_array}}}}
> {{ \{{ File "error.pxi", line 98, in pyarrow.lib.check_status}}}}
> {{ pyarrow.lib.ArrowException: Unknown error: 'utf-32-le' codec can't decode bytes in position 0-3: code point not in range(0x110000)}}{quote}
> To be clear, this happens when I run any aggregation, including the identity aggregation (return the Pandas DataFrame that was passed in). I do not get this error when I return an empty DataFrame, so it seems to be a symptom of the serialization of the Pandas DataFrame back to Spark.
> I have observed this behavior with the following versions:
> * Spark 2.3.0
> * PyArrow 0.9.0 (also 0.8.0)
> * Pandas 0.22.0 (also 0.22.1)
> * Numpy 1.14.1
> Here is some sample code:
> {quote}{{@func.pandas_udf(SCHEMA, func.PandasUDFType.GROUPED_MAP)}}{quote}
> {quote}{{def aggregation(df):}}{quote}
> {quote}{{ return df}}{quote}
> {quote}{{df.groupBy('a').apply(aggregation) # get error}}{quote}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)