You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Shardul Mahadik (Jira)" <ji...@apache.org> on 2022/08/29 23:25:00 UTC
[jira] [Created] (SPARK-40262) Expensive UDF evaluation pushed down past a join leads to performance issues
Shardul Mahadik created SPARK-40262:
---------------------------------------
Summary: Expensive UDF evaluation pushed down past a join leads to performance issues
Key: SPARK-40262
URL: https://issues.apache.org/jira/browse/SPARK-40262
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.0
Reporter: Shardul Mahadik
Consider a Spark job with an expensive UDF which looks like follows:
{code:scala}
val expensive_udf = spark.udf.register("expensive_udf", (i: Int) => Option(i))
spark.range(10).write.format("orc").save("/tmp/orc")
val df = spark.read.format("orc").load("/tmp/orc").as("a")
.join(spark.range(10).as("b"), "id")
.withColumn("udf_op", expensive_udf($"a.id"))
.join(spark.range(10).as("c"), $"udf_op" === $"c.id")
{code}
This creates a physical plan as follows:
{code:java}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [cast(udf_op#338 as bigint)], [id#344L], Inner, BuildRight, false
:- Project [id#330L, if (isnull(cast(id#330L as int))) null else expensive_udf(knownnotnull(cast(id#330L as int))) AS udf_op#338]
: +- BroadcastHashJoin [id#330L], [id#332L], Inner, BuildRight, false
: :- Filter ((isnotnull(id#330L) AND isnotnull(cast(id#330L as int))) AND isnotnull(expensive_udf(knownnotnull(cast(id#330L as int)))))
: : +- FileScan orc [id#330L] Batched: true, DataFilters: [isnotnull(id#330L), isnotnull(cast(id#330L as int)), isnotnull(expensive_udf(knownnotnull(cast(i..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/tmp/orc], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
: +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=416]
: +- Range (0, 10, step=1, splits=16)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=420]
+- Range (0, 10, step=1, splits=16)
{code}
In this case, the expensive UDF call is duplicated thrice. Since the UDF output is used in a future join, `InferFiltersFromConstraints` adds an `IS NOT NULL` filter on the UDF output. But the pushdown rules duplicate this UDF call and push the UDF past a previous join. The duplication behaviour [is documented|https://github.com/apache/spark/blob/c95ed826e23fdec6e1a779cfebde7b3364594fb5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L196] and in itself is not a huge issue. But given a highly restrictive join, the UDF gets evaluated on many orders of magnitude more rows than it should have slowing down the job.
Can we avoid this duplication of UDF calls? In SPARK-37392, we made a [similar change|https://github.com/apache/spark/pull/34823/files] where we decided to only add inferred filters if the input is an attribute. Should we use a similar strategy for `InferFiltersFromConstraints`?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org