You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abdeali Kothari (JIRA)" <ji...@apache.org> on 2018/11/15 03:49:00 UTC

[jira] [Updated] (SPARK-26067) Pandas GROUPED_MAP udf breaks if DF has >255 columns

     [ https://issues.apache.org/jira/browse/SPARK-26067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Abdeali Kothari updated SPARK-26067:
------------------------------------
    Description: 
When I run spark's Pandas GROUPED_MAP udfs to apply a UDAF i wrote in pythohn/pandas on a grouped dataframe in spark - it fails if the number of columns is greater than 255 in Pytohn 3.6 and lower.


{code:java}
import pyspark
from pyspark.sql import types as T, functions as F

spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [[i for i in range(256)], [i+1 for i in range(256)]], schema=["a" + str(i) for i in range(256)])

new_schema = T.StructType([
    field for field in df.schema] + [T.StructField("new_row", T.DoubleType())])

def myfunc(df):
    df['new_row'] = 1
    return df

myfunc_udf = F.pandas_udf(new_schema, F.PandasUDFType.GROUPED_MAP)(myfunc)

df2 = df.groupBy(["a1"]).apply(myfunc_udf)

print(df2.count())  # This FAILS
# ERROR:
# Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
#   File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 219, in main
#     func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
#   File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 148, in read_udfs
#     mapper = eval(mapper_str, udfs)
#   File "<string>", line 1
# SyntaxError: more than 255 arguments
{code}

Note: In Python 3.7 the 255 limit was raised, but I have not tried with Pytohn 3.7 ...https://docs.python.org/3.7/whatsnew/3.7.html#other-language-changes

I was using Python 3.5 (from anaconda), Spark 2.3.1 to reproduce thihs on my Hadoop Linux cluster and also on my Mac standalone spark installation.

  was:
When I run spark's Pandas GROUPED_MAP udfs to apply a UDAF i wrote in pythohn/pandas on a grouped dataframe in spark - it fails if the number of columns is greater than 255 in Pytohn 3.6 and lower.


{code:java}
import pyspark
from pyspark.sql import types as T, functions as F

spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [[i for i in range(256)], [i+1 for i in range(256)]], schema=["a" + str(i) for i in range(256)])

new_schema = T.StructType([
    field for field in df.schema] + [T.StructField("new_row", T.DoubleType())])

def myfunc(df):
    df['new_row'] = 1
    return df

myfunc_udf = F.pandas_udf(new_schema, F.PandasUDFType.GROUPED_MAP)(myfunc)

df2 = df.groupBy(["a1"]).apply(myfunc_udf)

print(df2.count())  # This FAILS
# ERROR:
# Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
#   File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 219, in main
#     func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
#   File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 148, in read_udfs
#     mapper = eval(mapper_str, udfs)
#   File "<string>", line 1
# SyntaxError: more than 255 arguments
{code}


I believe thhis is happening because internally this creates a UDF with inputs as every column in the DF.
https://github.com/apache/spark/blob/41c2227a2318029709553a588e44dee28f106350/python/pyspark/sql/group.py#L274

Note: In Python 3.7 the 255 limit was raised, but I have not tried with Pytohn 3.7 ...https://docs.python.org/3.7/whatsnew/3.7.html#other-language-changes

I was using Python 3.5 (from anaconda), Spark 2.3.1 to reproduce thihs on my Hadoop Linux cluster and also on my Mac standalone spark installation.


> Pandas GROUPED_MAP udf breaks if DF has >255 columns
> ----------------------------------------------------
>
>                 Key: SPARK-26067
>                 URL: https://issues.apache.org/jira/browse/SPARK-26067
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.2, 2.4.0
>            Reporter: Abdeali Kothari
>            Priority: Major
>
> When I run spark's Pandas GROUPED_MAP udfs to apply a UDAF i wrote in pythohn/pandas on a grouped dataframe in spark - it fails if the number of columns is greater than 255 in Pytohn 3.6 and lower.
> {code:java}
> import pyspark
> from pyspark.sql import types as T, functions as F
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> df = spark.createDataFrame(
>     [[i for i in range(256)], [i+1 for i in range(256)]], schema=["a" + str(i) for i in range(256)])
> new_schema = T.StructType([
>     field for field in df.schema] + [T.StructField("new_row", T.DoubleType())])
> def myfunc(df):
>     df['new_row'] = 1
>     return df
> myfunc_udf = F.pandas_udf(new_schema, F.PandasUDFType.GROUPED_MAP)(myfunc)
> df2 = df.groupBy(["a1"]).apply(myfunc_udf)
> print(df2.count())  # This FAILS
> # ERROR:
> # Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
> #   File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 219, in main
> #     func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
> #   File "/usr/local/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 148, in read_udfs
> #     mapper = eval(mapper_str, udfs)
> #   File "<string>", line 1
> # SyntaxError: more than 255 arguments
> {code}
> Note: In Python 3.7 the 255 limit was raised, but I have not tried with Pytohn 3.7 ...https://docs.python.org/3.7/whatsnew/3.7.html#other-language-changes
> I was using Python 3.5 (from anaconda), Spark 2.3.1 to reproduce thihs on my Hadoop Linux cluster and also on my Mac standalone spark installation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org