You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/01/17 19:28:00 UTC

[jira] [Commented] (SPARK-37290) Exponential planning time in case of non-deterministic function

    [ https://issues.apache.org/jira/browse/SPARK-37290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477375#comment-17477375 ] 

Apache Spark commented on SPARK-37290:
--------------------------------------

User 'Stelyus' has created a pull request for this issue:
https://github.com/apache/spark/pull/35231

> Exponential planning time in case of non-deterministic function
> ---------------------------------------------------------------
>
>                 Key: SPARK-37290
>                 URL: https://issues.apache.org/jira/browse/SPARK-37290
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kaya Kupferschmidt
>            Priority: Major
>
> We are experiencing an exponential growth of processing time in case of some DataFrame queries including non-deterministic functions. I could create a small example program, which can be pasted into the Spark shell for reproducing the issue:
> {code:scala}
> val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
>     .cache()
> val adselect = adselect_raw.select(
>         expr("uuid()").alias("userUuid"),
>         expr("_1").alias("impressionUuid"),
>         expr("_1").alias("accessDateTime"),
>         expr("_1").alias("publisher"),
>         expr("_1").alias("site"),
>         expr("_1").alias("placement"),
>         expr("_1").alias("advertiser"),
>         expr("_1").alias("campaign"),
>         expr("_1").alias("lineItem"),
>         expr("_1").alias("creative"),
>         expr("_1").alias("browserLanguage"),
>         expr("_1").alias("geoLocode"),
>         expr("_1").alias("osFamily"),
>         expr("_1").alias("osName"),
>         expr("_1").alias("browserName"),
>         expr("_1").alias("referrerDomain"),
>         expr("_1").alias("placementIabCategory"),
>         expr("_1").alias("placementDeviceGroup"),
>         expr("_1").alias("placementDevice"),
>         expr("_1").alias("placementVideoType"),
>         expr("_1").alias("placementSection"),
>         expr("_1").alias("placementPlayer"),
>         expr("_1").alias("demandType"),
>         expr("_1").alias("techCosts"),
>         expr("_1").alias("mediaCosts"),
>         expr("_1").alias("directSPrice"),
>         expr("_1").alias("network"),
>         expr("_1").alias("deviceSetting"),
>         expr("_1").alias("placementGroup"),
>         expr("_1").alias("postalCode"),
>         expr("_1").alias("householdId")
>     )
> val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
> val adcount = adcount_raw.select(
>         expr("_1").alias("impressionUuid"),
>         expr("_2").alias("accessDateTime")
>     )
> val result =  adselect.join(adcount, Seq("impressionUuid"))
> result.explain()
> {code}
> Further reducing the program (for example by removing the join or the cache) did not show the problem any more.
> The problem occurs during planning time and debugging lead me to the function {{UnaryNode.getAllValidConstraints}} where the local variable {{allConstraints}} grew with an apparently exponential number of entries for the non-deterministic function "{{{}uuid(){}}}" in the code example above. Every time a new column from the large select is processed in the {{foreach}} loop in the function {{{}UnaryNode.getAllValidConstraints{}}}, the number of entries for the {{uuid()}} column in the ExpressionSet seems to be doubled:
> {code:scala}
> trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
>   override def getAllValidConstraints(projectList: Seq[NamedExpression]): ExpressionSet = {
>     var allConstraints = child.constraints
>     projectList.foreach {
>       case a @ Alias(l: Literal, _) =>
>         allConstraints += EqualNullSafe(a.toAttribute, l)
>       case a @ Alias(e, _) =>
>         // KK: Since the ExpressionSet handles each non-deterministic function as a separate entry, each "uuid()" entry in allConstraints is re-added over an over again in every iteration, 
>         // thereby doubling the list every time    
>         allConstraints ++= allConstraints.map(_ transform {
>           case expr: Expression if expr.semanticEquals(e) =>
>             a.toAttribute
>         })
>         allConstraints += EqualNullSafe(e, a.toAttribute)
>       case _ => // Don't change.
>     }
>     allConstraints
>   }
> }
> {code}
> As a workaround, we moved the {{uuid()}} column in our code to the end of the list in the select statement, which solved the issue (since all other columns were already processed in the {{foreach}} loop).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org