You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "bnaya (JIRA)" <ji...@apache.org> on 2018/11/04 07:13:00 UTC

[jira] [Commented] (SPARK-25491) pandas_udf(GROUPED_MAP) fails when using ArrayType(ArrayType(DoubleType()))

    [ https://issues.apache.org/jira/browse/SPARK-25491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674328#comment-16674328 ] 

bnaya commented on SPARK-25491:
-------------------------------

it seems that this issue is solved in pyarrow 0.11.1
{code:java}
 

import pandas as pd
import numpy as np
import pyarrow as pa
import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.types import *
import pyspark.sql.functions as sprk_func

 
print pyspark.__version__
print pd.__version__
print pa.__version__
 
sp_conf = SparkConf().setAppName("test").setMaster("local[1]").set('spark.driver.memory','4g')
sc = SparkContext(conf=sp_conf)
spark = SparkSession(sc)
pd_data = pd.DataFrame({'id':(np.random.rand(20)*10).astype(int)})
data_df = spark.createDataFrame(pd_data,StructType([StructField('id', IntegerType(), True)]))


@sprk_func.pandas_udf(StructType([StructField('mat', ArrayType(ArrayType(DoubleType())), True)]), sprk_func.PandasUDFType.GROUPED_MAP)
def return_mat_group(group):
    pd_data = pd.DataFrame({'mat': np.random.rand(7, 4, 4).tolist()})
    return pd_data

data_df.groupby(data_df.id).apply(return_mat_group).show()
 
 
return:
 
'2.3.1'
'0.19.2'
'0.11.1'
 
+--------------------+                                                          
|                 mat|
+--------------------+
|[[0.0089076143159...|
|[[0.1723721167023...|
|[[0.5005937826042...|
|[[0.0950025489277...|
|[[0.4570371676204...|
|[[0.0368978707619...|
|[[0.8505389658189...|
|[[0.7559247374754...|
|[[0.5019783543632...|
|[[0.8007640474503...|
|[[0.9834015655664...|
|[[0.0144050696578...|
|[[0.6987339923336...|
|[[0.0928546895693...|
|[[0.5950922750587...|
|[[0.9228797057154...|
|[[0.3801700823230...|
|[[0.5848840463692...|
|[[0.1784268979122...|
|[[0.5865915923077...|
+--------------------+
only showing top 20 rows
{code}
 

> pandas_udf(GROUPED_MAP) fails when using ArrayType(ArrayType(DoubleType()))  
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-25491
>                 URL: https://issues.apache.org/jira/browse/SPARK-25491
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.1
>         Environment: Linux
> python 2.7.9
> pyspark 2.3.1 (also reproduces on pyspark 2.3.0)
> pyarrow 0.9.0 (working OK when using pyarrow 0.8.0)
>            Reporter: Ofer Fridman
>            Priority: Major
>
> After upgrading from pyarrow-0.8.0  to pyarrow-0.9.0 using pandas_udf (in PandasUDFType.GROUPED_MAP), results in an error:
> {quote}Caused by: java.io.EOFException
>  at java.io.DataInputStream.readInt(DataInputStream.java:392)
>  at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
>  ... 24 more
> {quote}
> The problem occurs only when using complex type like ArrayType(ArrayType(DoubleType())) usege of ArrayType(DoubleType()) did not reproduce this issue.
> here is a simple example to reproduce this issue:
> {quote}import pandas as pd
> import numpy as np
> from pyspark.sql import SparkSession
> from pyspark.context import SparkContext, SparkConf
> from pyspark.sql.types import *
> import pyspark.sql.functions as sprk_func
> sp_conf = SparkConf().setAppName("stam").setMaster("local[1]").set('spark.driver.memory','4g')
> sc = SparkContext(conf=sp_conf)
> spark = SparkSession(sc)
> pd_data = pd.DataFrame(\{'id':(np.random.rand(20)*10).astype(int)})
> data_df = spark.createDataFrame(pd_data,StructType([StructField('id', IntegerType(), True)]))
> @sprk_func.pandas_udf(StructType([StructField('mat', ArrayType(ArrayType(DoubleType())), True)]), sprk_func.PandasUDFType.GROUPED_MAP)
> def return_mat_group(group):
>  pd_data = pd.DataFrame(\{'mat': np.random.rand(7, 4, 4).tolist()})
>  return pd_data
> data_df.groupby(data_df.id).apply(return_mat_group).show(){quote}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org