You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Dmitry Kravchuk (Jira)" <ji...@apache.org> on 2021/02/11 21:38:00 UTC

[jira] [Commented] (ARROW-10957) Expanding pyarrow buffer size more than 2GB for pandas_udf functions

    [ https://issues.apache.org/jira/browse/ARROW-10957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283388#comment-17283388 ] 

Dmitry Kravchuk commented on ARROW-10957:
-----------------------------------------

[~liyafan] sorry for late reply...
I have no any environment or any possible way to use Java code from your example.
Can you help me with metadata size calculation?
May be it can be done with known schema if my data?
I mean it should possible to calculate metadata size using schema columns and theirs types, am I right?

> Expanding pyarrow buffer size more than 2GB for pandas_udf functions
> --------------------------------------------------------------------
>
>                 Key: ARROW-10957
>                 URL: https://issues.apache.org/jira/browse/ARROW-10957
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: C++, Java, Python
>    Affects Versions: 2.0.0
>         Environment: Spark: 2.4.4
> Python:
> Dcycler (0.10.0)
> glmnet-py (0.1.0b2)
> joblib (1.0.0)
> kiwisolver (1.3.1)
> lightgbm (3.1.1) EPRECATION
> matplotlib (3.0.3)
> numpy (1.19.4)
> pandas (1.1.5)
> pip (9.0.3: The default format will switch to columns in the future. You can)
> pyarrow 2.0.0
> pyparsing (2.4.7) use --format=(legacy|columns) (or define a format=(python-dateutil (2.8.1)
> pytz (202legacy|columns) in yo0.4)
> scikit-learn (0.23.2)
> scipy (1.5.4)
> setuptools (51.0.0) ur pip.conf under the [list] section) to disable this warnsix (1.15.0)
> sklearn (0.0)
> threadpoolctl (2.1.0)
> venv-paing. ck (0.2.0)
> wheel (0.36.2)
>            Reporter: Dmitry Kravchuk
>            Priority: Major
>              Labels: features, patch, performance
>             Fix For: 2.0.1
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> There is 2GB limit for data that can be passed to any pandas_udf function and the aim of this issue is to expand this limit. It's very small buffer size if we use pyspark and our goal is fitting machine learning models.
> Steps to reproduce - just use following spark-submit for executing following after python function.
> {code:java}
> %sh
> cd /home/zeppelin/code && \
> export PYSPARK_DRIVER_PYTHON=/home/zeppelin/envs/env3/bin/python && \
> export PYSPARK_PYTHON=./env3/bin/python && \
> export ARROW_PRE_0_15_IPC_FORMAT=1 && \
> spark-submit \
> --master yarn \
> --deploy-mode client \
> --num-executors 5 \
> --executor-cores 5 \
> --driver-memory 8G \
> --executor-memory 8G \
> --conf spark.executor.memoryOverhead=4G \
> --conf spark.driver.memoryOverhead=4G \
> --archives /home/zeppelin/env3.tar.gz#env3 \
> --jars "/opt/deltalake/delta-core_2.11-0.5.0.jar" \
> --py-files jobs.zip,"/opt/deltalake/delta-core_2.11-0.5.0.jar" main.py \
> --job temp
> {code}
>  
> {code:java|title=Bar.Python|borderStyle=solid}
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> def analyze(spark):
>     pdf1 = pd.DataFrame(
>         [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>         columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
>     )
>     df1 = spark.createDataFrame(pd.concat([pdf1 for i in range(429)]).reset_index()).drop('index')
>     pdf2 = pd.DataFrame(
>         [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", "abcdefghijklmno"]],
>         columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
>     )
>     df2 = spark.createDataFrame(pd.concat([pdf2 for i in range(48993)]).reset_index()).drop('index')
>     df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
>     def myudf(df):
>         import os
>         os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
>         return df
>     df4 = df3 \
>         .withColumn('df1_c1', F.col('df1_c1').cast(T.IntegerType())) \
>         .withColumn('df1_c2', F.col('df1_c2').cast(T.DoubleType())) \
>         .withColumn('df1_c3', F.col('df1_c3').cast(T.StringType())) \
>         .withColumn('df1_c4', F.col('df1_c4').cast(T.StringType())) \
>         .withColumn('df2_c1', F.col('df2_c1').cast(T.IntegerType())) \
>         .withColumn('df2_c2', F.col('df2_c2').cast(T.DoubleType())) \
>         .withColumn('df2_c3', F.col('df2_c3').cast(T.StringType())) \
>         .withColumn('df2_c4', F.col('df2_c4').cast(T.StringType())) \
>         .withColumn('df2_c5', F.col('df2_c5').cast(T.StringType())) \
>         .withColumn('df2_c6', F.col('df2_c6').cast(T.StringType()))
>     print(df4.printSchema())
>     udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
>     df5 = df4.groupBy('df1_c1').apply(udf)
>     print('df5.count()', df5.count())
> {code}
> If you need more details please let me know.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)