You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2020/08/31 05:52:00 UTC
[jira] [Commented] (SPARK-32746) Not able to run Pandas UDF
[ https://issues.apache.org/jira/browse/SPARK-32746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187443#comment-17187443 ]
Hyukjin Kwon commented on SPARK-32746:
--------------------------------------
[~rahulbhatia], please share your analysis on the logs and more symptoms. You could try to profile and share the results as well.
> Not able to run Pandas UDF
> ---------------------------
>
> Key: SPARK-32746
> URL: https://issues.apache.org/jira/browse/SPARK-32746
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.0.0
> Environment: Pyspark 3.0.0
> PyArrow - 1.0.1(also tried with Pyarrrow 0.15.1, no progress there)
> Pandas - 0.25.3
>
> Reporter: Rahul Bhatia
> Priority: Major
> Attachments: Screenshot 2020-08-31 at 9.04.07 AM.png
>
>
> Hi,
> I am facing issues in running Pandas UDF on a yarn cluster with multiple nodes, I am trying to perform a simple DBSCAN algorithm to multiple groups in my dataframe, to start with, I am just using a simple example to test things out -
> {code:python}
> import pandas as pd
> from pyspark.sql.types import StructType, StructField, DoubleType, StringType, IntegerType
> from sklearn.cluster import DBSCAN
> from pyspark.sql.functions import pandas_udf, PandasUDFTypedata
> data = [(1, 11.6133, 48.1075),
> (1, 11.6142, 48.1066),
> (1, 11.6108, 48.1061),
> (1, 11.6207, 48.1192),
> (1, 11.6221, 48.1223),
> (1, 11.5969, 48.1276),
> (2, 11.5995, 48.1258),
> (2, 11.6127, 48.1066),
> (2, 11.6430, 48.1275),
> (2, 11.6368, 48.1278),
> (2, 11.5930, 48.1156)]
> df = spark.createDataFrame(data, ["id", "X", "Y"])
> output_schema = StructType(
> [
> StructField('id', IntegerType()),
> StructField('X', DoubleType()),
> StructField('Y', DoubleType()),
> StructField('cluster', IntegerType())
> ]
> )
> @pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
> def dbscan(data):
> data["cluster"] = DBSCAN(eps=5, min_samples=3).fit_predict(data[["X", "Y"]])
> result = pd.DataFrame(data, columns=["id", "X", "Y", "cluster"])
> return result
> res = df.groupby("id").apply(dbscan)
> res.show()
> {code}
>
> The code keeps running forever on the yarn cluster, I expect it to be finished within seconds(this works fine on standalone mode and finishes in 2-4 seconds), on checking the Spark UI, I can see that the Spark job is stuck(99/580) and doesn't make any progress forever.
>
> Also it doesn't run in parallel, am I missing something? !Screenshot 2020-08-31 at 9.04.07 AM.png!
>
>
> I am new to Spark, and still trying to understand a lot of things.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org