You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/06/13 03:20:00 UTC
[jira] [Commented] (SPARK-28016) Spark hangs when an execution plan
has many projections on nested structs
[ https://issues.apache.org/jira/browse/SPARK-28016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862682#comment-16862682 ]
Hyukjin Kwon commented on SPARK-28016:
--------------------------------------
Can you narrow down and make a self-contained reproducer without such codes like {{DataSetWrapper}}?
> Spark hangs when an execution plan has many projections on nested structs
> -------------------------------------------------------------------------
>
> Key: SPARK-28016
> URL: https://issues.apache.org/jira/browse/SPARK-28016
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 2.4.3
> Environment: Tried in
> * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
> * Spark 2.4.3 / Yarn on a Linux cluster
> Reporter: Ruslan Yushchenko
> Priority: Major
> Attachments: NestedOps.scala, SparkApp1Issue.scala, SparkApp2Workaround.scala, spark-app-nested.tgz
>
>
> Spark applications freeze on execution plan optimization stage (Catalyst) when a logical execution plan contains a lot of projections that operate on nested struct fields.
> 2 Spark Applications are attached. One demonstrates the issue, the other demonstrates a workaround. Also, an archive is attached where these jobs are packages as a Maven Project.
> To reproduce the attached Spark App does the following:
> * A small dataframe is created from a JSON example.
> * A nested withColumn map transformation is used to apply a transformation on a struct field and create a new struct field. The code for this transformation is also attached.
> * Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing the execution plan
> {code:scala}
> package za.co.absa.spark.app
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> object SparkApp1Issue {
> // A sample data for a dataframe with nested structs
> val sample =
> """
> |{
> | "strings": {
> | "simple": "Culpa repellat nesciunt accusantium",
> | "all_random": "DESebo8d%fL9sX@AzVin",
> | "whitespaces": " q bb l "
> | },
> | "numerics": {
> | "small_positive": 722,
> | "small_negative": -660,
> | "big_positive": 669223368251997,
> | "big_negative": -161176863305841,
> | "zero": 0
> | }
> |}
> """.stripMargin ::
> """{
> | "strings": {
> | "simple": "Accusamus quia vel deleniti",
> | "all_random": "rY&n9UnVcD*KS]jPBpa[",
> | "whitespaces": " t e t rp z p"
> | },
> | "numerics": {
> | "small_positive": 268,
> | "small_negative": -134,
> | "big_positive": 768990048149640,
> | "big_negative": -684718954884696,
> | "zero": 0
> | }
> |}
> |""".stripMargin ::
> """{
> | "strings": {
> | "simple": "Quia numquam deserunt delectus rem est",
> | "all_random": "GmRdQlE4Avn1hSlVPAH",
> | "whitespaces": " c sa yv drf "
> | },
> | "numerics": {
> | "small_positive": 909,
> | "small_negative": -363,
> | "big_positive": 592517494751902,
> | "big_negative": -703224505589638,
> | "zero": 0
> | }
> |}
> |""".stripMargin :: Nil
> /**
> * This Spark Job demonstrates an issue of execution plan freezing when there are a lot of projections
> * involving nested structs in an execution plan.
> *
> * The example works as follows:
> * - A small dataframe is created from a JSON example above
> * - A nested withColumn map transformation is used to apply a transformation on a struct field and create
> * a new struct field.
> * - Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing
> * the execution plan
> */
> def main(args: Array[String]): Unit = {
> val sparkBuilder = SparkSession.builder().appName("Nested Projections Issue")
> val spark = sparkBuilder
> .master("local[4]")
> .getOrCreate()
> import spark.implicits._
> import za.co.absa.spark.utils.NestedOps.DataSetWrapper
> val df = spark.read.json(sample.toDS)
> // Apply several uppercase and negation transformations
> val dfOutput = df
> .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => upper(c))
> .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => upper(c))
> .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => upper(c))
> .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => -c)
> .nestedWithColumnMap("numerics.small_negative", "numerics.num2", c => -c)
> .nestedWithColumnMap("numerics.big_positive", "numerics.num3", c => -c)
> .nestedWithColumnMap("numerics.big_negative", "numerics.num4", c => -c)
> .nestedWithColumnMap("numerics.small_positive", "numerics.num5", c => -c)
> .nestedWithColumnMap("numerics.small_negative", "numerics.num6", c => -c)
> .nestedWithColumnMap("numerics.big_positive", "numerics.num7", c => -c)
> .nestedWithColumnMap("numerics.big_negative", "numerics.num8", c => -c)
> // Uncommenting the line below will cause Catalyst to freeze completely
> //.nestedWithColumnMap("numerics.big_negative", "numerics.num9", c => -c)
> dfOutput.printSchema()
> dfOutput.explain(true)
> dfOutput.show
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org