You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Dmitry Kravchuk (Jira)" <ji...@apache.org> on 2021/03/01 08:06:00 UTC

[jira] [Commented] (ARROW-10957) Expanding pyarrow buffer size more than 2GB for pandas_udf functions

    [ https://issues.apache.org/jira/browse/ARROW-10957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17292705#comment-17292705 ] 

Dmitry Kravchuk commented on ARROW-10957:
-----------------------------------------

[~emkornfield] [~fan_li_ya]

Hello.

I've tested code at another cluster with spark 3.0.2 env and still have error.

Can you help me out?

 

Environment:
 !spark3 env.png!

Python env:
 !python env.png!

Error:
 !spark3 error.png!

Testing code:
{code:java}
from pyspark.sql import functions as F, types as T
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pandas as pd
import pyarrow as pa

spark = SparkSession.builder.appName('spark3_example').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
spark.conf.set('spark.sql.shuffle.partitions', '200')
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

pdf1 = pd.DataFrame(
[[1234567, 0.0, 'abcdefghij', '2000-01-01T00:00:00.000Z']],
columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
)
df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(500)]).reset_index()).drop('index')

pdf2 = pd.DataFrame(
[[1234567, 0.0, 'abcdefghijklmno', '2000-01-01', 'abcdefghijklmno', 'abcdefghijklmno']],
columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
)
df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(50000)]).reset_index()).drop('index')
df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')

def myudf(df):
    return df

df4 = df3 \
.withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
.withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
.withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
.withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
.withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
.withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
.withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
.withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
.withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
.withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))

df5 = df4.groupBy('df1_c1').applyInPandas(myudf, df4.schema)
df5.show()
{code}
spark-submit:

export SPARK_HOME=/opt/spark31/spark-3.0.2-bin-hadoop3.2 && \
 export HADOOP_CONF_DIR=/etc/hadoop && \
 export YARN_CONF_DIR=/etc/hadoop && \
 export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env/bin/python && \
 export PYSPARK_PYTHON=/usr/bin/python3 && \
 export HADOOP_USER_NAME=hdfs && \
 spark-submit \
 --master yarn \
 --deploy-mode client \
 --num-executors 7 \
 --executor-cores 2 \
 --driver-memory 11G \
 --executor-memory 11G \
 --archives /home/zeppelin/env.tar.gz#env \
 --py-files /home/zeppelin/code/spark3_testing.py /home/zeppelin/code/spark3_testing.py

> Expanding pyarrow buffer size more than 2GB for pandas_udf functions
> --------------------------------------------------------------------
>
>                 Key: ARROW-10957
>                 URL: https://issues.apache.org/jira/browse/ARROW-10957
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++, Java, Python
>    Affects Versions: 2.0.0
>         Environment: Spark: 2.4.4
> Python:
> Dcycler (0.10.0)
> glmnet-py (0.1.0b2)
> joblib (1.0.0)
> kiwisolver (1.3.1)
> lightgbm (3.1.1) EPRECATION
> matplotlib (3.0.3)
> numpy (1.19.4)
> pandas (1.1.5)
> pip (9.0.3: The default format will switch to columns in the future. You can)
> pyarrow 2.0.0
> pyparsing (2.4.7) use --format=(legacy|columns) (or define a format=(python-dateutil (2.8.1)
> pytz (202legacy|columns) in yo0.4)
> scikit-learn (0.23.2)
> scipy (1.5.4)
> setuptools (51.0.0) ur pip.conf under the [list] section) to disable this warnsix (1.15.0)
> sklearn (0.0)
> threadpoolctl (2.1.0)
> venv-paing. ck (0.2.0)
> wheel (0.36.2)
>            Reporter: Dmitry Kravchuk
>            Priority: Major
>              Labels: features, patch, performance
>             Fix For: 2.0.1
>
>         Attachments: python env.png, spark3 env.png, spark3 error.png
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> There is 2GB limit for data that can be passed to any pandas_udf function and the aim of this issue is to expand this limit. It's very small buffer size if we use pyspark and our goal is fitting machine learning models.
> Steps to reproduce - just use following spark-submit for executing following after python function.
> {code:java}
> %sh
> cd /home/zeppelin/code && \
> export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \
> export PYSPARK_PYTHON=./env3/bin/python && \
> export ARROW_PRE_0_15_IPC_FORMAT=1 && \
> spark-submit \
> --master yarn \
> --deploy-mode client \
> --num-executors 5 \
> --executor-cores 5 \
> --driver-memory 8G \
> --executor-memory 8G \
> --conf spark.executor.memoryOverhead=4G \
> --conf spark.driver.memoryOverhead=4G \
> --archives /home/zeppelin/env3.tar.gz#env3 \
> --jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \
> --py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \
> --job temp
> {code}
>  
> {code:java|title=Bar.Python|borderStyle=solid}
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> def analyze(spark):
>     pdf1 = pd.DataFrame(
>         [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>         columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
>     )
>     df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')
>     pdf2 = pd.DataFrame(
>         [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
>         columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
>     )
>     df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
>     df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
>     def myudf(df):
>         import os
>         os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
>         return df
>     df4 = df3 \
>         .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
>         .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
>         .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
>         .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
>         .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
>         .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
>         .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
>         .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
>         .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
>         .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
>     print(df4.printSchema())
>     udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
>     df5 = df4.groupBy('df1_c1').apply(udf)
>     print('df5.count()', df5.count())
> {code}
> If you need more details please let me know.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)