You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/10/08 15:24:00 UTC

[jira] [Resolved] (SPARK-16681) Optimizer changes order of filter predicates involving UDFs, which changes semantics

     [ https://issues.apache.org/jira/browse/SPARK-16681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-16681.
----------------------------------
    Resolution: Cannot Reproduce

With the diff:

{code}
diff --git a/build.sbt b/build.sbt
index 39c4e71..5a382bc 100644
--- a/build.sbt
+++ b/build.sbt
@@ -4,5 +4,5 @@ version := "1.0"

 scalaVersion := "2.11.8"

-libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0-preview"
-libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0-preview"
+libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
+libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
{code}

I can't reproduce this as described in the JIRA from 2.2.0. It looks it passes and fixed somewhere. I think it'd be nicer if someone could identify the JIRA fixing this and backports it if applicable. I could not find the JIRA so I am resolving this as a {{Cannot Reproduce}} for now.

> Optimizer changes order of filter predicates involving UDFs, which changes semantics
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-16681
>                 URL: https://issues.apache.org/jira/browse/SPARK-16681
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Stefan Fehrenbach
>
> For some queries, the optimizer changes the order in which filter predicates are executed. For UDFs that are not total, this changes the semantics.
> A simple example:
> Create an input dataset of one record, with one field "a" with value 2:
> {code:scala}
> val ds = sparkSession.createDataFrame(Seq(Row(1)).toList.asJava, StructType(Seq(StructField("a", IntegerType))))
> {code}
> Write a query like this:
> {code:scala}
> val e = ds.filter(column("a").equalTo(2))
>       .select(udf(throws, IntegerType)(column("a")).as("foo"))
>       .filter(column("foo").leq(2))
> {code}
> Where {{throws}} is a function which throws for any argument that is not 2, like this one:
> {code:scala}
>     val throws = (x: Integer) =>
>       if (x == 2) 2
>       else throw new Exception
> {code}
> The first filter in the query filters out any values that are not 2.
> Thus, when we execute the UDF in the select, we expect it to only be run on values that are 2.
> This is not the case, the execution throws:
> {noformat}
> 16/07/22 09:22:15 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.Exception
> 	at SparkBug$$anonfun$1.apply(SparkBug.scala:18)
> 	at SparkBug$$anonfun$1.apply(SparkBug.scala:16)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:246)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:240)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
> 	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> ...
> {noformat}
> That's what the logical query plan says:
> {noformat}
> == Analyzed Logical Plan ==
> foo: int
> Filter (foo#3 <= 2)
> +- Project [UDF(a#0) AS foo#3]
>    +- Filter (a#0 = 2)
>       +- LocalRelation [a#0], [[1]]
> {noformat}
> Unfortunately, the optimized logical plan and finally the physical plan disagree:
> {noformat}
> == Optimized Logical Plan ==
> Project [UDF(a#0) AS foo#3]
> +- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
>    +- LocalRelation [a#0], [[1]]
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [UDF(a#0) AS foo#3]
> :     +- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
> :        +- INPUT
> +- LocalTableScan [a#0], [[1]]
> {noformat}
> There is a seemingly related bug: https://issues.apache.org/jira/browse/SPARK-13773
> That bug mentions a PR which adds documentation to UDFs saying that they need to be deterministic.
> This is not enough! The UDF in the code above is perfectly deterministic. It throws every time it's called with illegal input.
> The failing test from above is in a github repo: https://github.com/fehrenbach/spark-bug/blob/master/src/main/scala/SparkBug.scala
> Just clone and run {{sbt run}}.
> I'd be happy to fix this myself, if someone wants to hold my hand doing it. I haven't looked at the Spark source code before and would not know where to start.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org