You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ohad Raviv (JIRA)" <ji...@apache.org> on 2019/05/14 15:10:00 UTC

[jira] [Commented] (SPARK-27707) Performance issue using explode

    [ https://issues.apache.org/jira/browse/SPARK-27707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16839544#comment-16839544 ] 

Ohad Raviv commented on SPARK-27707:
------------------------------------

[~cloud_fan] - any chance you can take a look?

> Performance issue using explode
> -------------------------------
>
>                 Key: SPARK-27707
>                 URL: https://issues.apache.org/jira/browse/SPARK-27707
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0, 2.4.3
>            Reporter: Ohad Raviv
>            Priority: Major
>
> this is a corner case of SPARK-21657.
> we have a case where we want to explode array inside a struct and also keep some other columns of the struct. we again encounter a huge performance issue.
> reconstruction code:
> {code}
> val df = spark.sparkContext.parallelize(Seq(("1",
>           Array.fill(M)({
>             val i = math.random
>             (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
>           })))).toDF("col", "arr")
>           .selectExpr("col", "struct(col, arr) as st")
>           .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
> df.write.mode("overwrite").save("/tmp/blah")
> {code}
> a workaround is projecting before the explode:
> {code}
> val df = spark.sparkContext.parallelize(Seq(("1",
>           Array.fill(M)({
>             val i = math.random
>             (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
>           })))).toDF("col", "arr")
>           .selectExpr("col", "struct(col, arr) as st")
>           .withColumn("col1", $"st.col")
>           .selectExpr("col", "col1", "explode(st.arr) as arr_col")
> df.write.mode("overwrite").save("/tmp/blah")
> {code}
> in this case the optimization done in SPARK-21657:
> {code}
>     // prune unrequired references
>     case p @ Project(_, g: Generate) if p.references != g.outputSet =>
>       val requiredAttrs = p.references -- g.producedAttributes ++ g.generator.references
>       val newChild = prunedChild(g.child, requiredAttrs)
>       val unrequired = g.generator.references -- p.references
>       val unrequiredIndices = newChild.output.zipWithIndex.filter(t => unrequired.contains(t._1))
>         .map(_._2)
>       p.copy(child = g.copy(child = newChild, unrequiredChildIndex = unrequiredIndices))
> {code}
> doesn't work because `p.references` has whole the `st` struct as reference and not just the projected field.
> this causes the entire struct including the huge array field to get duplicated as the number of array elements.
> I know this is kind of a corner case but was really non trivial to understand..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org