You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/11/27 11:25:00 UTC

[jira] [Assigned] (SPARK-26147) Python UDFs in join condition fail even when using columns from only one side of join

     [ https://issues.apache.org/jira/browse/SPARK-26147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-26147:
------------------------------------

    Assignee: Apache Spark

> Python UDFs in join condition fail even when using columns from only one side of join
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-26147
>                 URL: https://issues.apache.org/jira/browse/SPARK-26147
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Ala Luszczak
>            Assignee: Apache Spark
>            Priority: Major
>
> The rule {{PullOutPythonUDFInJoinCondition}} was implemented in [https://github.com/apache/spark/commit/2a8cbfddba2a59d144b32910c68c22d0199093fe]
>  As far as I understand, this rule was intended to prevent the use of Python UDFs in join condition if they take arguments from both sides of the join, and this doesn't make sense in combination with the join type.
> The rule {{PullOutPythonUDFInJoinCondition}} seems to make an assumption, that if a given UDF is only using columns from a single side of the join, it will be already pushed down under the join before this rule is executed.
> However, this is not always the case. Here's a simple example that fails, even though it looks like it should run just fine (and it does in earlier versions of Spark):
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.types import StringType
> from pyspark.sql.functions import udf
> cars_list = [ Row("NL", "1234AB"), Row("UK", "987654") ]
> insurance_list = [ Row("NL-1234AB"), Row("BE-112233") ]
> spark.createDataFrame(data = cars_list, schema = ["country", "plate_nr"]).createOrReplaceTempView("cars")
> spark.createDataFrame(data = insurance_list, schema = ["insurance_code"]).createOrReplaceTempView("insurance")
> to_insurance_code = udf(lambda x, y: x + "-" + y, StringType())	
> sqlContext.udf.register('to_insurance_code', to_insurance_code)
> spark.conf.set("spark.sql.crossJoin.enabled", "true")
> # This query runs just fine.
> sql("""
>   SELECT country, plate_nr, insurance_code
>   FROM cars LEFT OUTER JOIN insurance
>   ON CONCAT(country, '-', plate_nr) = insurance_code
> """).show()
> # This equivalent query fails with:
> # pyspark.sql.utils.AnalysisException: u'Using PythonUDF in join condition of join type LeftOuter is not supported.;'
> sql("""
>   SELECT country, plate_nr, insurance_code
>   FROM cars LEFT OUTER JOIN insurance
>   ON to_insurance_code(country, plate_nr) = insurance_code
> """).show()
> {code}
> [~cloud_fan] [~XuanYuan] fyi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org