You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Kaya Kupferschmidt (Jira)" <ji...@apache.org> on 2021/11/11 11:31:00 UTC

[jira] [Updated] (SPARK-37290) Exponential planning time in case of non-deterministic function

     [ https://issues.apache.org/jira/browse/SPARK-37290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kaya Kupferschmidt updated SPARK-37290:
---------------------------------------
    Description: 
We are experiencing an exponential growth of processing time in case of some DataFrame queries including non-deterministic functions. I could create a small example program, which can be pasted into the Spark shell for reproducing the issue:
{code:scala}
val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()
{code}
Further reducing the program (for example by removing the join or the cache) did not show the problem any more.

The problem occurs during planning time and debugging lead me to the function {{UnaryNode.getAllValidConstraints}} where the ExpressionSet {{allConstraints}} grew with an apparently exponential number of entries for the non-deterministic function "{{{}uuid(){}}}" in the code example above. Every time a new column from the large select is processed in the {{foreach}} loop in the function {{{}UnaryNode.getAllValidConstraints{}}}, the number of entries for the {{uuid()}} column in the ExpressionSet seems to be doubled.

As a workaround, we moved the {{uuid()}} column to the end of the list, which solved the issue (since all other columns were already processed in the {{foreach}} loop).

  was:
We are experiencing an exponential growth of processing time in case of some DataFrame queries including non-deterministic functions. I could create a small example program, which can be pasted into the Spark shell for reproducing the issue:
{code:scala}
val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()
{code}
Further reducing the program (for example by removing the join or the cache) did not show the problem any more.

The problem occurs during planning time and debugging lead me to the function `UnaryNode.getAllValidConstraints` where the ExpressionSet `allConstraints` grew with an apparently exponential number of entries for the non-deterministic function "uuid()" in the code example above. Every time a new column from the large select is processed in the `foreach` loop in the function `UnaryNode.getAllValidConstraints`, the number of entries for the uuid() column in the ExpressionSet seems to be doubled.

As a workaround, we moved the uuid() column to the end of the list, which solved the issue (since all other columns were already processed in the `foreach` loop).


> Exponential planning time in case of non-deterministic function
> ---------------------------------------------------------------
>
>                 Key: SPARK-37290
>                 URL: https://issues.apache.org/jira/browse/SPARK-37290
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kaya Kupferschmidt
>            Priority: Major
>
> We are experiencing an exponential growth of processing time in case of some DataFrame queries including non-deterministic functions. I could create a small example program, which can be pasted into the Spark shell for reproducing the issue:
> {code:scala}
> val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
>     .cache()
> val adselect = adselect_raw.select(
>         expr("uuid()").alias("userUuid"),
>         expr("_1").alias("impressionUuid"),
>         expr("_1").alias("accessDateTime"),
>         expr("_1").alias("publisher"),
>         expr("_1").alias("site"),
>         expr("_1").alias("placement"),
>         expr("_1").alias("advertiser"),
>         expr("_1").alias("campaign"),
>         expr("_1").alias("lineItem"),
>         expr("_1").alias("creative"),
>         expr("_1").alias("browserLanguage"),
>         expr("_1").alias("geoLocode"),
>         expr("_1").alias("osFamily"),
>         expr("_1").alias("osName"),
>         expr("_1").alias("browserName"),
>         expr("_1").alias("referrerDomain"),
>         expr("_1").alias("placementIabCategory"),
>         expr("_1").alias("placementDeviceGroup"),
>         expr("_1").alias("placementDevice"),
>         expr("_1").alias("placementVideoType"),
>         expr("_1").alias("placementSection"),
>         expr("_1").alias("placementPlayer"),
>         expr("_1").alias("demandType"),
>         expr("_1").alias("techCosts"),
>         expr("_1").alias("mediaCosts"),
>         expr("_1").alias("directSPrice"),
>         expr("_1").alias("network"),
>         expr("_1").alias("deviceSetting"),
>         expr("_1").alias("placementGroup"),
>         expr("_1").alias("postalCode"),
>         expr("_1").alias("householdId")
>     )
> val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
> val adcount = adcount_raw.select(
>         expr("_1").alias("impressionUuid"),
>         expr("_2").alias("accessDateTime")
>     )
> val result =  adselect.join(adcount, Seq("impressionUuid"))
> result.explain()
> {code}
> Further reducing the program (for example by removing the join or the cache) did not show the problem any more.
> The problem occurs during planning time and debugging lead me to the function {{UnaryNode.getAllValidConstraints}} where the ExpressionSet {{allConstraints}} grew with an apparently exponential number of entries for the non-deterministic function "{{{}uuid(){}}}" in the code example above. Every time a new column from the large select is processed in the {{foreach}} loop in the function {{{}UnaryNode.getAllValidConstraints{}}}, the number of entries for the {{uuid()}} column in the ExpressionSet seems to be doubled.
> As a workaround, we moved the {{uuid()}} column to the end of the list, which solved the issue (since all other columns were already processed in the {{foreach}} loop).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org