You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kaushalya (JIRA)" <ji...@apache.org> on 2018/10/31 17:05:00 UTC

[jira] [Updated] (SPARK-25898) pandas_udf not catching errors early on

     [ https://issues.apache.org/jira/browse/SPARK-25898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kaushalya updated SPARK-25898:
------------------------------
    Summary: pandas_udf not catching errors early on  (was: pandas_udf )

> pandas_udf not catching errors early on
> ---------------------------------------
>
>                 Key: SPARK-25898
>                 URL: https://issues.apache.org/jira/browse/SPARK-25898
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2
>            Reporter: Kaushalya
>            Priority: Major
>
> The usage of pandas_udfs do not show the possible errors in implementation at first point of invocation. The errors manifest down the line when accessing the returned dataframe. The issue is not found in small datasets, only when the datasets are distributed across servers. 
> {code:java}
> import pandas as pd
> import numpy as np
> import matplotlib.pyplot as plt
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf,PandasUDFType
> from pyspark.sql.types import *
> spark = SparkSession.builder.appName("myapp").getOrCreate()
> schema = StructType(one_second_df.schema.fields + [StructField("gradient_order_rate", DoubleType())])
> one_second_df = spark.read.parquet("data")
> @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
> def gradient(df):
> df["gradient"] = np.gradient(df.val_count)
> return df
> agg_df = one_second_df.groupby("id").apply(gradient)
> agg_df.show(2)
> #### THE FOLLOWING LINES THREW ERRORS such as : 
> "Shape of array too small to calculate a numerical gradient, "
> ValueError: Shape of array too small to calculate a numerical gradient, at least (edge_order + 1) elements are required."
> fd = agg_df.filter(agg_df["id"] == "a")
> fd.show(5)
> Weirdly this error is thrown after the second show command which should technically have run through the entire dataset but probably doesn't do that. I think this should be fixed, as it would be difficult to catch errors that manifest down the line if they are not identifiable earlier on. 
> {code}
>  
> Please also find working code for a similar workflow but with a much smaller dataset
> {code:java}
> from pyspark.sql.functions import pandas_udf,PandasUDFType
> df3 = spark.createDataFrame(
> [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
> ("key", "value1", "value2")
> )
> from pyspark.sql.types import *
> schema = StructType([
> StructField("key", StringType()),
> StructField("avg_value1", DoubleType()),
> StructField("avg_value2", DoubleType()),
> StructField("sum_avg", DoubleType()),
> StructField("sub_avg", DoubleType())
> ])
> @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
> def g(df):
> gr = df['key'].iloc[0]
> x = df.value1.mean()
> y = df.value2.mean()
> w = df.value1.mean() + df.value2.mean()
> z = df.value1.mean() - df.value2.mean()
> return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
> ss = df3.groupby("key").apply(g)
> ss.show(5)
> my_filter = ss.filter(ss["key"] == 'a')
> my_filter.show(2)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org