You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/11/07 12:35:00 UTC

[jira] [Assigned] (SPARK-25963) Optimize generate followed by window

     [ https://issues.apache.org/jira/browse/SPARK-25963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-25963:
------------------------------------

    Assignee: Apache Spark

> Optimize generate followed by window
> ------------------------------------
>
>                 Key: SPARK-25963
>                 URL: https://issues.apache.org/jira/browse/SPARK-25963
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0, 2.4.0
>            Reporter: Ohad Raviv
>            Assignee: Apache Spark
>            Priority: Minor
>
> We've noticed that for our use-cases when we have explode followed by a window function we can almost always optimize it by adding repartition by the windows' partition before the explode.
> for example:
> {code:java}
> import org.apache.spark.sql.functions._
> val N = 1 << 12
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=0")
> val tokens = spark.range(N).selectExpr(
> "floor(id/4) as key", "'asd/asd/asd/asd/asd/asd' as url")
> // .repartition("cust_id")
> .selectExpr("*", "explode(split(url, '/')) as token")
> import org.apache.spark.sql.expressions._
> val w = Window.partitionBy("key", "token")
> val res = tokens.withColumn("cnt", count("token").over(w))
> res.explain(true)
> {code}
> {noformat}
> == Optimized Logical Plan ==
> Window [count(token#11) windowspecdefinition(key#6L, token#11, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS cnt#17L], [key#6L, token#11]
> +- Generate explode([asd,asd,asd,asd,asd,asd]), false, [token#11]
>    +- Project [FLOOR((cast(id#4L as double) / 4.0)) AS key#6L, asd/asd/asd/asd/asd/asd AS url#7]
>       +- Range (0, 4096, step=1, splits=Some(1))
> {noformat}
> currently all the data will be exploded in the first stage, then shuffled and then aggregated.
> we can achieve exactly the same computation if we first shuffle the data and in the second stage explode and aggregate.
> I have a PR that tries to resolve this. I'm just not sure I thought about all the cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org