You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/14 01:05:34 UTC

[GitHub] dilipbiswal opened a new pull request #23780: [SPARK-26864][SQL][BACKPORT-2.4] Query may return incorrect result when python udf is used as a join condition and the udf uses attributes from both legs of left semi join

dilipbiswal opened a new pull request #23780: [SPARK-26864][SQL][BACKPORT-2.4] Query may return incorrect result when python udf is used as a join condition and the udf uses attributes from both legs of left semi join
URL: https://github.com/apache/spark/pull/23780
 
 
   ## What changes were proposed in this pull request?
   n SPARK-25314, we supported the scenario of having a python UDF that refers to attributes from both legs of a join condition by rewriting the plan to convert an inner join or left semi join to a filter over a cross join. In case of left semi join, this transformation may cause incorrect results when the right leg of join condition produces duplicate rows based on the join condition. This fix disallows the rewrite for left semi join and raises an error in the case like we do for other types of join. In future, we should have separate rule in optimizer to convert left semi join to inner join (I am aware of one case we could do it if we leverage informational constraint i.e when we know the right side does not produce duplicates).
   
   **Python**
   
   ```SQL
   >>> from pyspark import SparkContext
   >>> from pyspark.sql import SparkSession, Column, Row
   >>> from pyspark.sql.functions import UserDefinedFunction, udf
   >>> from pyspark.sql.types import *
   >>> from pyspark.sql.utils import AnalysisException
   >>>
   >>> spark.conf.set("spark.sql.crossJoin.enabled", "True")
   >>> left = spark.createDataFrame([Row(lc1=1, lc2=1), Row(lc1=2, lc2=2)])
   >>> right = spark.createDataFrame([Row(rc1=1, rc2=1), Row(rc1=1, rc2=1)])
   >>> func = udf(lambda a, b: a == b, BooleanType())
   >>> df = left.join(right, func("lc1", "rc1"), "leftsemi").show()
   19/02/12 16:07:10 WARN PullOutPythonUDFInJoinCondition: The join condition:<lambda>(lc1#0L, rc1#4L) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
   +---+---+
   |lc1|lc2|
   +---+---+
   |  1|  1|
   |  1|  1|
   +---+---+
   ```
   
   **Scala**
   
   ```SQL
   scala> val left = Seq((1, 1), (2, 2)).toDF("lc1", "lc2")
   left: org.apache.spark.sql.DataFrame = [lc1: int, lc2: int]
   
   scala> val right = Seq((1, 1), (1, 1)).toDF("rc1", "rc2")
   right: org.apache.spark.sql.DataFrame = [rc1: int, rc2: int]
   
   scala> val equal = udf((p1: Integer, p2: Integer) => {
        |   p1 == p2
        | })
   equal: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2141/1101629239@4666f1b5,BooleanType,List(Some(Schema(IntegerType,true)), Some(Schema(IntegerType,true))),None,false,true)
   
   scala> val df = left.join(right, equal(col("lc1"), col("rc1")), "leftsemi")
   df: org.apache.spark.sql.DataFrame = [lc1: int, lc2: int]
   
   scala> df.show()
   +---+---+
   |lc1|lc2|
   +---+---+
   |  1|  1|
   +---+---+
   
   ```
   ## How was this patch tested?
   Modified existing tests.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org