You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bryan Cutler (JIRA)" <ji...@apache.org> on 2019/07/16 19:22:00 UTC

[jira] [Commented] (SPARK-28269) Pandas Grouped Map UDF can get deadlocked

    [ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16886400#comment-16886400 ] 

Bryan Cutler commented on SPARK-28269:
--------------------------------------

cc [~icexelloss]

> Pandas Grouped Map UDF can get deadlocked
> -----------------------------------------
>
>                 Key: SPARK-28269
>                 URL: https://issues.apache.org/jira/browse/SPARK-28269
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.3
>            Reporter: Modi Tamam
>            Priority: Major
>         Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # ------------------- Start Generating the big data frame-------------------
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
>     import numpy as np
>     import pandas as pd
>     return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # ------------------- End Generating the big data frame-------------------
> # -------------------Start the bug reproduction---------------------------
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
>     import pandas as pd
>     ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
>     return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>  
> The above code gets stuck on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in      pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf "spark.executor.memory=20G" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf "spark.driver.memory=10G"{code}
>  
> Versions:
>  * pandas - 0.24.2
>  * pyarrow - 0.13.0
>  * Spark - 2.4.2
>  * Python - 2.7.16



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org