You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/07/11 04:52:00 UTC
[jira] [Comment Edited] (SPARK-28269) ArrowStreamPandasSerializer
get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882639#comment-16882639 ]
Hyukjin Kwon edited comment on SPARK-28269 at 7/11/19 4:51 AM:
---------------------------------------------------------------
Workaround to me was call `copy()` on this line:
{code:java}
- return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count))
+ return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count)).copy()
{code}
or converting it to list so that we avoid to use memoryview. But given my experience, this is less performant.
{code}
- return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count))
+ return pd.DataFrame(list(np.random.randint(0, 100, size=(rows, cols_count))), columns=range(cols_count))
{code}
was (Author: hyukjin.kwon):
Workaround to me was call `copy()` on this line:
{code:java}
- return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count))
+ return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count)).copy()
{code}
> ArrowStreamPandasSerializer get stack
> -------------------------------------
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.4.3
> Reporter: Modi Tamam
> Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
> * ~15M rows
> * ~130 columns
> * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # ------------------- Start Generating the big data frame-------------------
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
> import numpy as np
> import pandas as pd
> return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # ------------------- End Generating the big data frame-------------------
> # -------------------Start the bug reproduction---------------------------
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
> import pandas as pd
> ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
> return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>
> The above code gets stuck on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader)
>
> {code:java}
> for batch in reader:
> yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]{code}
>
> You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf "spark.executor.memory=20G" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf "spark.driver.memory=10G"{code}
>
> Versions:
> * pandas - 0.24.2
> * pyarrow - 0.13.0
> * Spark - 2.4.2
> * Python - 2.7.16
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org