You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Yushchenko (JIRA)" <ji...@apache.org> on 2019/06/12 07:32:01 UTC

[jira] [Created] (SPARK-28016) Spark hangs when an execution plan has many projections on nested structs

Ruslan Yushchenko created SPARK-28016:
-----------------------------------------

             Summary: Spark hangs when an execution plan has many projections on nested structs
                 Key: SPARK-28016
                 URL: https://issues.apache.org/jira/browse/SPARK-28016
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 2.4.3
         Environment: Tried in
 * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
 * Spark 2.4.3 / Yarn on a Linux cluster
            Reporter: Ruslan Yushchenko


Spark applications freeze on execution plan optimization stage (Catalyst) when a logical execution plan contains a lot of projections that operate on nested struct fields.

2 Spark Applications are attached. One demonstrates the issue, the other demonstrates a workaround. Also, an archive is attached where these jobs are packages as a Maven Project.

To reproduce the attached Spark App does the following:
 * A small dataframe is created from a JSON example.
 * A nested withColumn map transformation is used to apply a transformation on a struct field and create a new struct field. The code for this transformation is also attached. 
 * Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing the execution plan

{code:scala}
package za.co.absa.spark.app

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

object SparkApp1Issue {

  // A sample data for a dataframe with nested structs
  val sample =
    """
      |{
      |  "strings": {
      |    "simple": "Culpa repellat nesciunt accusantium",
      |    "all_random": "DESebo8d%fL9sX@AzVin",
      |    "whitespaces": "    q    bb    l    "
      |  },
      |  "numerics": {
      |    "small_positive": 722,
      |    "small_negative": -660,
      |    "big_positive": 669223368251997,
      |    "big_negative": -161176863305841,
      |    "zero": 0
      |  }
      |}
    """.stripMargin ::
      """{
        |  "strings": {
        |    "simple": "Accusamus quia vel deleniti",
        |    "all_random": "rY&n9UnVcD*KS]jPBpa[",
        |    "whitespaces": "  t e   t   rp   z p"
        |  },
        |  "numerics": {
        |    "small_positive": 268,
        |    "small_negative": -134,
        |    "big_positive": 768990048149640,
        |    "big_negative": -684718954884696,
        |    "zero": 0
        |  }
        |}
        |""".stripMargin ::
      """{
        |  "strings": {
        |    "simple": "Quia numquam deserunt delectus rem est",
        |    "all_random": "GmRdQlE4Avn1hSlVPAH",
        |    "whitespaces": "   c   sa    yv   drf "
        |  },
        |  "numerics": {
        |    "small_positive": 909,
        |    "small_negative": -363,
        |    "big_positive": 592517494751902,
        |    "big_negative": -703224505589638,
        |    "zero": 0
        |  }
        |}
        |""".stripMargin :: Nil

  /**
    * This Spark Job demonstrates an issue of execution plan freezing when there are a lot of projections
    * involving nested structs in an execution plan.
    *
    * The example works as follows:
    * - A small dataframe is created from a JSON example above
    * - A nested withColumn map transformation is used to apply a transformation on a struct field and create
    *   a new struct field.
    * - Once more than 11 such transformations are applied the Catalyst optimizer freezes on optimizing
    *   the execution plan
    */
  def main(args: Array[String]): Unit = {

    val sparkBuilder = SparkSession.builder().appName("Nested Projections Issue")
    val spark = sparkBuilder
      .master("local[4]")
      .getOrCreate()

    import spark.implicits._
    import za.co.absa.spark.utils.NestedOps.DataSetWrapper

    val df = spark.read.json(sample.toDS)

    // Apply several uppercase and negation transformations
    val dfOutput = df
      .nestedWithColumnMap("strings.simple", "strings.uppercase1", c => upper(c))
      .nestedWithColumnMap("strings.all_random", "strings.uppercase2", c => upper(c))
      .nestedWithColumnMap("strings.whitespaces", "strings.uppercase3", c => upper(c))
      .nestedWithColumnMap("numerics.small_positive", "numerics.num1", c => -c)
      .nestedWithColumnMap("numerics.small_negative", "numerics.num2", c => -c)
      .nestedWithColumnMap("numerics.big_positive", "numerics.num3", c => -c)
      .nestedWithColumnMap("numerics.big_negative", "numerics.num4", c => -c)
      .nestedWithColumnMap("numerics.small_positive", "numerics.num5", c => -c)
      .nestedWithColumnMap("numerics.small_negative", "numerics.num6", c => -c)
      .nestedWithColumnMap("numerics.big_positive", "numerics.num7", c => -c)
      .nestedWithColumnMap("numerics.big_negative", "numerics.num8", c => -c)
      // Uncommenting the line below will cause Catalyst to freeze completely
      //.nestedWithColumnMap("numerics.big_negative", "numerics.num9", c => -c)

    dfOutput.printSchema()
    dfOutput.explain(true)
    dfOutput.show
  }

}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org