You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/10/08 15:24:00 UTC
[jira] [Resolved] (SPARK-16681) Optimizer changes order of filter
predicates involving UDFs, which changes semantics
[ https://issues.apache.org/jira/browse/SPARK-16681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-16681.
----------------------------------
Resolution: Cannot Reproduce
With the diff:
{code}
diff --git a/build.sbt b/build.sbt
index 39c4e71..5a382bc 100644
--- a/build.sbt
+++ b/build.sbt
@@ -4,5 +4,5 @@ version := "1.0"
scalaVersion := "2.11.8"
-libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0-preview"
-libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0-preview"
+libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
+libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
{code}
I can't reproduce this as described in the JIRA from 2.2.0. It looks it passes and fixed somewhere. I think it'd be nicer if someone could identify the JIRA fixing this and backports it if applicable. I could not find the JIRA so I am resolving this as a {{Cannot Reproduce}} for now.
> Optimizer changes order of filter predicates involving UDFs, which changes semantics
> ------------------------------------------------------------------------------------
>
> Key: SPARK-16681
> URL: https://issues.apache.org/jira/browse/SPARK-16681
> Project: Spark
> Issue Type: Bug
> Affects Versions: 2.0.0
> Reporter: Stefan Fehrenbach
>
> For some queries, the optimizer changes the order in which filter predicates are executed. For UDFs that are not total, this changes the semantics.
> A simple example:
> Create an input dataset of one record, with one field "a" with value 2:
> {code:scala}
> val ds = sparkSession.createDataFrame(Seq(Row(1)).toList.asJava, StructType(Seq(StructField("a", IntegerType))))
> {code}
> Write a query like this:
> {code:scala}
> val e = ds.filter(column("a").equalTo(2))
> .select(udf(throws, IntegerType)(column("a")).as("foo"))
> .filter(column("foo").leq(2))
> {code}
> Where {{throws}} is a function which throws for any argument that is not 2, like this one:
> {code:scala}
> val throws = (x: Integer) =>
> if (x == 2) 2
> else throw new Exception
> {code}
> The first filter in the query filters out any values that are not 2.
> Thus, when we execute the UDF in the select, we expect it to only be run on values that are 2.
> This is not the case, the execution throws:
> {noformat}
> 16/07/22 09:22:15 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.Exception
> at SparkBug$$anonfun$1.apply(SparkBug.scala:18)
> at SparkBug$$anonfun$1.apply(SparkBug.scala:16)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:246)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> ...
> {noformat}
> That's what the logical query plan says:
> {noformat}
> == Analyzed Logical Plan ==
> foo: int
> Filter (foo#3 <= 2)
> +- Project [UDF(a#0) AS foo#3]
> +- Filter (a#0 = 2)
> +- LocalRelation [a#0], [[1]]
> {noformat}
> Unfortunately, the optimized logical plan and finally the physical plan disagree:
> {noformat}
> == Optimized Logical Plan ==
> Project [UDF(a#0) AS foo#3]
> +- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
> +- LocalRelation [a#0], [[1]]
> == Physical Plan ==
> WholeStageCodegen
> : +- Project [UDF(a#0) AS foo#3]
> : +- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
> : +- INPUT
> +- LocalTableScan [a#0], [[1]]
> {noformat}
> There is a seemingly related bug: https://issues.apache.org/jira/browse/SPARK-13773
> That bug mentions a PR which adds documentation to UDFs saying that they need to be deterministic.
> This is not enough! The UDF in the code above is perfectly deterministic. It throws every time it's called with illegal input.
> The failing test from above is in a github repo: https://github.com/fehrenbach/spark-bug/blob/master/src/main/scala/SparkBug.scala
> Just clone and run {{sbt run}}.
> I'd be happy to fix this myself, if someone wants to hold my hand doing it. I haven't looked at the Spark source code before and would not know where to start.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org