You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Dmitry Kravchuk (Jira)" <ji...@apache.org> on 2020/12/18 11:16:00 UTC

[jira] [Commented] (ARROW-10957) Expanding pyarrow buffer size more than 2GB for pandas_udf functions

    [ https://issues.apache.org/jira/browse/ARROW-10957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251704#comment-17251704 ] 

Dmitry Kravchuk commented on ARROW-10957:
-----------------------------------------

[~fan_li_ya] here we go.

> Expanding pyarrow buffer size more than 2GB for pandas_udf functions
> --------------------------------------------------------------------
>
>                 Key: ARROW-10957
>                 URL: https://issues.apache.org/jira/browse/ARROW-10957
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++, Java, Python
>    Affects Versions: 2.0.0
>         Environment: Spark: 2.4.4
> Python:
> Dcycler (0.10.0)
> glmnet-py (0.1.0b2)
> joblib (1.0.0)
> kiwisolver (1.3.1)
> lightgbm (3.1.1) EPRECATION
> matplotlib (3.0.3)
> numpy (1.19.4)
> pandas (1.1.5)
> pip (9.0.3: The default format will switch to columns in the future. You can)
> pyarrow 2.0.0
> pyparsing (2.4.7) use --format=(legacy|columns) (or define a format=(python-dateutil (2.8.1)
> pytz (202legacy|columns) in yo0.4)
> scikit-learn (0.23.2)
> scipy (1.5.4)
> setuptools (51.0.0) ur pip.conf under the [list] section) to disable this warnsix (1.15.0)
> sklearn (0.0)
> threadpoolctl (2.1.0)
> venv-paing. ck (0.2.0)
> wheel (0.36.2)
>            Reporter: Dmitry Kravchuk
>            Priority: Major
>              Labels: features, patch, performance
>             Fix For: 2.0.1
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> There is 2GB limit for data that can be passed to any pandas_udf function and the aim of this issue is to expand this limit. It's very small buffer size if we use pyspark and our goal is fitting machine learning models.
> Steps to reproduce - just use following spark-submit for executing following after python function.
> {code:java}
> %sh
> cd /home/zeppelin/code && \
> export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \
> export PYSPARK_PYTHON=./env3/bin/python && \
> export ARROW_PRE_0_15_IPC_FORMAT=1 && \
> spark-submit \
> --master yarn \
> --deploy-mode client \
> --num-executors 5 \
> --executor-cores 5 \
> --driver-memory 8G \
> --executor-memory 8G \
> --conf spark.executor.memoryOverhead=4G \
> --conf spark.driver.memoryOverhead=4G \
> --archives /home/zeppelin/env3.tar.gz#env3 \
> --jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \
> --py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \
> --job temp
> {code}
>  
> {code:java|title=Bar.Python|borderStyle=solid}
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> def analyze(spark):
>     pdf1 = pd.DataFrame(
>         [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>         columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
>     )
>     df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')
>     pdf2 = pd.DataFrame(
>         [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
>         columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
>     )
>     df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
>     df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
>     def myudf(df):
>         import os
>         os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
>         return df
>     df4 = df3 \
>         .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
>         .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
>         .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
>         .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
>         .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
>         .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
>         .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
>         .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
>         .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
>         .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
>     print(df4.printSchema())
>     udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
>     df5 = df4.groupBy('df1_c1').apply(udf)
>     print('df5.count()', df5.count())
> {code}
> If you need more details please let me know.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)