You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Stefan Fehrenbach (JIRA)" <ji...@apache.org> on 2016/07/22 13:49:20 UTC

[jira] [Created] (SPARK-16681) Optimizer changes order of filter predicates involving UDFs, which changes semantics

Stefan Fehrenbach created SPARK-16681:
-----------------------------------------

             Summary: Optimizer changes order of filter predicates involving UDFs, which changes semantics
                 Key: SPARK-16681
                 URL: https://issues.apache.org/jira/browse/SPARK-16681
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.0.0
            Reporter: Stefan Fehrenbach


For some queries, the optimizer changes the order in which filter predicates are executed. For UDFs that are not total, this changes the semantics.

A simple example:

Create an input dataset of one record, with one field "a" with value 2:
{code:scala}
val ds = sparkSession.createDataFrame(Seq(Row(1)).toList.asJava, StructType(Seq(StructField("a", IntegerType))))
{code}

Write a query like this:
{code:scala}
val e = ds.filter(column("a").equalTo(2))
      .select(udf(throws, IntegerType)(column("a")).as("foo"))
      .filter(column("foo").leq(2))
{code}

Where {{throws}} is a function which throws for any argument that is not 2, like this one:
{code:scala}
    val throws = (x: Integer) =>
      if (x == 2) 2
      else throw new Exception
{code}

The first filter in the query filters out any values that are not 2.
Thus, when we execute the UDF in the select, we expect it to only be run on values that are 2.
This is not the case, the execution throws:
{noformat}
16/07/22 09:22:15 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.Exception
	at SparkBug$$anonfun$1.apply(SparkBug.scala:18)
	at SparkBug$$anonfun$1.apply(SparkBug.scala:16)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:246)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:240)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
...
{noformat}

That's what the logical query plan says:
{noformat}
== Analyzed Logical Plan ==
foo: int
Filter (foo#3 <= 2)
+- Project [UDF(a#0) AS foo#3]
   +- Filter (a#0 = 2)
      +- LocalRelation [a#0], [[1]]
{noformat}

Unfortunately, the optimized logical plan and finally the physical plan disagree:
{noformat}
== Optimized Logical Plan ==
Project [UDF(a#0) AS foo#3]
+- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
   +- LocalRelation [a#0], [[1]]

== Physical Plan ==
WholeStageCodegen
:  +- Project [UDF(a#0) AS foo#3]
:     +- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
:        +- INPUT
+- LocalTableScan [a#0], [[1]]
{noformat}

There is a seemingly related bug: https://issues.apache.org/jira/browse/SPARK-13773
That bug mentions a PR which adds documentation to UDFs saying that they need to be deterministic.
This is not enough! The UDF in the code above is perfectly deterministic. It throws every time it's called with illegal input.

The failing test from above is in a github repo: https://github.com/fehrenbach/spark-bug/blob/master/src/main/scala/SparkBug.scala
Just clone and run {{sbt run}}.

I'd be happy to fix this myself, if someone wants to hold my hand doing it. I haven't looked at the Spark source code before and would not know where to start.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org