You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kaushalya (JIRA)" <ji...@apache.org> on 2018/10/31 17:04:00 UTC

[jira] [Created] (SPARK-25898) pandas_udf

Kaushalya created SPARK-25898:
---------------------------------

             Summary: pandas_udf 
                 Key: SPARK-25898
                 URL: https://issues.apache.org/jira/browse/SPARK-25898
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.2
            Reporter: Kaushalya


The usage of pandas_udfs do not show the possible errors in implementation at first point of invocation. The errors manifest down the line when accessing the returned dataframe. The issue is not found in small datasets, only when the datasets are distributed across servers. 
{code:java}
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql.types import *

spark = SparkSession.builder.appName("myapp").getOrCreate()

schema = StructType(one_second_df.schema.fields + [StructField("gradient_order_rate", DoubleType())])

one_second_df = spark.read.parquet("data")

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def gradient(df):
df["gradient"] = np.gradient(df.val_count)
return df

agg_df = one_second_df.groupby("id").apply(gradient)
agg_df.show(2)

#### THE FOLLOWING LINES THREW ERRORS such as : 
"Shape of array too small to calculate a numerical gradient, "
ValueError: Shape of array too small to calculate a numerical gradient, at least (edge_order + 1) elements are required."

fd = agg_df.filter(agg_df["id"] == "a")
fd.show(5)

Weirdly this error is thrown after the second show command which should technically have run through the entire dataset but probably doesn't do that. I think this should be fixed, as it would be difficult to catch errors that manifest down the line if they are not identifiable earlier on. 


{code}
 

Please also find working code for a similar workflow but with a much smaller dataset
{code:java}
from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

ss = df3.groupby("key").apply(g)
ss.show(5)
my_filter = ss.filter(ss["key"] == 'a')
my_filter.show(2)
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org