You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gautham Acharya <ga...@alleninstitute.org> on 2020/05/06 00:07:58 UTC

PyArrow Exception in Pandas UDF GROUPEDAGG()

Hi everyone,

I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.

The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.

----------
| label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
|   label_a  |         2.0        |         5.6       |      7.123      |
|   label_b  |         11.0      |         1.4       |      2.345      |
|   label_a  |         3.1        |         6.2       |      5.444      |



My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:

20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 2358)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
    for series in iterator:
  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 303, in load_stream
    for batch in reader:
  File "pyarrow/ipc.pxi", line 266, in __iter__
  File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1

Looking at this issue<https://issues.apache.org/jira/browse/ARROW-4890>, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.

I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.

I've attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.

Thanks,
Gautham





RE: PyArrow Exception in Pandas UDF GROUPEDAGG()

Posted by Gautham Acharya <ga...@alleninstitute.org>.
Thanks for the quick reply, Zhang.

I don't think that we have too much data skew, and if we do, there isn't much of a way around it - we need to groupby this specific column in order to run aggregates. 

I'm running this with PySpark, it doesn't look like the groupBy() function takes a numPartitions column. What other options can I explore?

--gautham

-----Original Message-----
From: ZHANG Wei <we...@outlook.com> 
Sent: Thursday, May 7, 2020 1:34 AM
To: Gautham Acharya <ga...@alleninstitute.org>
Cc: user@spark.apache.org
Subject: Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

CAUTION: This email originated from outside the Allen Institute. Please do not click links or open attachments unless you've validated the sender and know the content is safe.
________________________________

AFAICT, there might be data skews, some partitions got too much rows, which caused out of memory limitation. Trying .groupBy().count() or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is worth a try.

--
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +0000
Gautham Acharya <ga...@alleninstitute.org> wrote:

> Hi everyone,
>
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
>
> The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.
>
> ----------
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
> |   label_a  |         2.0        |         5.6       |      7.123      |
> |   label_b  |         11.0      |         1.4       |      2.345      |
> |   label_a  |         3.1        |         6.2       |      5.444      |
>
>
>
> My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:
>
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 
> (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 377, in main
>     process()
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 372, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
>     for series in iterator:
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 303, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Looking at this issue<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FARROW-4890&amp;data=02%7C01%7C%7Caca3586676f846bc5a3308d7f2617a31%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C0%7C637244372788272964&amp;sdata=21PIT2sq8Kzmi3ct%2FY6e%2BahHhDZkru%2BPnnkTRMpm%2Ffg%3D&amp;reserved=0>, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.
>
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.
>
> I've attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.
>
> Thanks,
> Gautham
>
>
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

Posted by ZHANG Wei <we...@outlook.com>.
AFAICT, there might be data skews, some partitions got too much rows,
which caused out of memory limitation. Trying .groupBy().count()
or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is
worth a try.

-- 
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +0000
Gautham Acharya <ga...@alleninstitute.org> wrote:

> Hi everyone,
> 
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
> 
> The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.
> 
> ----------
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
> |   label_a  |         2.0        |         5.6       |      7.123      |
> |   label_b  |         11.0      |         1.4       |      2.345      |
> |   label_a  |         3.1        |         6.2       |      5.444      |
> 
> 
> 
> My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:
> 
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 377, in main
>     process()
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 372, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
>     for series in iterator:
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 303, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> 
> Looking at this issue<https://issues.apache.org/jira/browse/ARROW-4890>, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.
> 
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.
> 
> I've attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.
> 
> Thanks,
> Gautham
> 
> 
> 
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org