You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shardul Mahadik (Jira)" <ji...@apache.org> on 2022/08/29 23:25:00 UTC

[jira] [Created] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues

Shardul Mahadik created SPARK-40262:
---------------------------------------

             Summary: Expensive UDF evaluation pushed down past a join leads to performance issues 
                 Key: SPARK-40262
                 URL: https://issues.apache.org/jira/browse/SPARK-40262
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Shardul Mahadik


Consider a Spark job with an expensive UDF which looks like follows:
{code:scala}
val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))

spark.range(10).write.format("orc").save("/tmp/orc")

val df = spark.read.format("orc").load("/tmp/orc").as("a")
    .join(spark.range(10).as("b"), "id")
    .withColumn("udf_op", expensive_udf($"a.id"))
    .join(spark.range(10).as("c"), $"udf_op" === $"c.id")
{code}
This creates a physical plan as follows:
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, BuildRight, false
   :- Project [id#330L, if (isnull(cast(id#330L as int))) null else expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
   :  +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
   :     :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)))))
   :     :  +- FileScan orc [id#330L] Batched: true, DataFilters: [isnotnull(id#330L), isnotnull(cast(id#330L as int)), isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=416]
   :        +- Range (0, 10, step=1, splits=16)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=420]
      +- Range (0, 10, step=1, splits=16)
{code}
In this case, the expensive UDF call is duplicated thrice. Since the UDF output is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF call and push the UDF past a previous join. The duplication behaviour [is documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] and in itself is not a huge issue. But given a highly restrictive join, the UDF gets evaluated on many orders of magnitude more rows than it should have slowing down the job.

Can we avoid this duplication of UDF calls? In SPARK-37392, we made a [similar change|https://github.com/apache/spark/pull/34823/files] where we decided to only add inferred filters if the input is an attribute. Should we use a similar strategy for `InferFiltersFromConstraints`?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org