You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kaushalya (JIRA)" <ji...@apache.org> on 2018/10/31 17:04:00 UTC
[jira] [Created] (SPARK-25898) pandas_udf
Kaushalya created SPARK-25898:
---------------------------------
Summary: pandas_udf
Key: SPARK-25898
URL: https://issues.apache.org/jira/browse/SPARK-25898
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.3.2
Reporter: Kaushalya
The usage of pandas_udfs do not show the possible errors in implementation at first point of invocation. The errors manifest down the line when accessing the returned dataframe. The issue is not found in small datasets, only when the datasets are distributed across servers.
{code:java}
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.appName("myapp").getOrCreate()
schema = StructType(one_second_df.schema.fields + [StructField("gradient_order_rate", DoubleType())])
one_second_df = spark.read.parquet("data")
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def gradient(df):
df["gradient"] = np.gradient(df.val_count)
return df
agg_df = one_second_df.groupby("id").apply(gradient)
agg_df.show(2)
#### THE FOLLOWING LINES THREW ERRORS such as :
"Shape of array too small to calculate a numerical gradient, "
ValueError: Shape of array too small to calculate a numerical gradient, at least (edge_order + 1) elements are required."
fd = agg_df.filter(agg_df["id"] == "a")
fd.show(5)
Weirdly this error is thrown after the second show command which should technically have run through the entire dataset but probably doesn't do that. I think this should be fixed, as it would be difficult to catch errors that manifest down the line if they are not identifiable earlier on.
{code}
Please also find working code for a similar workflow but with a much smaller dataset
{code:java}
from pyspark.sql.functions import pandas_udf,PandasUDFType
df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
ss = df3.groupby("key").apply(g)
ss.show(5)
my_filter = ss.filter(ss["key"] == 'a')
my_filter.show(2)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org