You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Yushchenko (JIRA)" <ji...@apache.org> on 2019/06/18 06:57:00 UTC

[jira] [Created] (SPARK-28090) Spark hangs when an execution plan has many projections on nested structs

Ruslan Yushchenko created SPARK-28090:
-----------------------------------------

             Summary: Spark hangs when an execution plan has many projections on nested structs
                 Key: SPARK-28090
                 URL: https://issues.apache.org/jira/browse/SPARK-28090
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 2.4.3
         Environment: Tried in
 * Spark 2.2.1, Spark 2.4.3 in local mode on Linux, MasOS and Windows
 * Spark 2.4.3 / Yarn on a Linux cluster
            Reporter: Ruslan Yushchenko


This was already posted (#28016), but the provided example didn't always reproduce the error.

Spark applications freeze on execution plan optimization stage (Catalyst) when a logical execution plan contains a lot of projections that operate on nested struct fields.

The code listed below demonstrates the issue.

To reproduce the Spark App does the following:
 * A small dataframe is created from a JSON example.
 * Several nested transformations (negation of a number) are applied on struct fields and each time a new struct field is created. 
 * Once more than 9 such transformations are applied the Catalyst optimizer freezes on optimizing the execution plan.
 * You can control the freezing by choosing different upper bound for the Range. E.g. it will work file if the upper bound is 5, but will hang is the bound is 10.

{code}
package com.example

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
import scala.collection.mutable.ListBuffer

object SparkApp1IssueSelfContained {

  // A sample data for a dataframe with nested structs
  val sample: List[String] =
    """ { "numerics": {"num1": 101, "num2": 102, "num3": 103, "num4": 104, "num5": 105, "num6": 106, "num7": 107, "num8": 108, "num9": 109, "num10": 110, "num11": 111, "num12": 112, "num13": 113, "num14": 114, "num15": 115} } """ ::
    """ { "numerics": {"num1": 201, "num2": 202, "num3": 203, "num4": 204, "num5": 205, "num6": 206, "num7": 207, "num8": 208, "num9": 209, "num10": 210, "num11": 211, "num12": 212, "num13": 213, "num14": 214, "num15": 215} } """ ::
    """ { "numerics": {"num1": 301, "num2": 302, "num3": 303, "num4": 304, "num5": 305, "num6": 306, "num7": 307, "num8": 308, "num9": 309, "num10": 310, "num11": 311, "num12": 312, "num13": 313, "num14": 314, "num15": 315} } """ ::
    Nil

  /**
    * Transforms a column inside a nested struct. The transformed value will be put into a new field of that nested struct
    *
    * The output column name can omit the full path as the field will be created at the same level of nesting as the input column.
    *
    * @param inputColumnName  A column name for which to apply the transformation, e.g. `company.employee.firstName`.
    * @param outputColumnName The output column name. The path is optional, e.g. you can use `transformedName` instead of `company.employee.transformedName`.
    * @param expression       A function that applies a transformation to a column as a Spark expression.
    * @return A dataframe with a new field that contains transformed values.
    */
  def transformInsideNestedStruct(df: DataFrame,
                                  inputColumnName: String,
                                  outputColumnName: String,
                                  expression: Column => Column): DataFrame = {
    def mapStruct(schema: StructType, path: Seq[String], parentColumn: Option[Column] = None): Seq[Column] = {
      val mappedFields = new ListBuffer[Column]()

      def handleMatchedLeaf(field: StructField, curColumn: Column): Seq[Column] = {
        val newColumn = expression(curColumn).as(outputColumnName)
        mappedFields += newColumn
        Seq(curColumn)
      }

      def handleMatchedNonLeaf(field: StructField, curColumn: Column): Seq[Column] = {
        // Non-leaf columns need to be further processed recursively
        field.dataType match {
          case dt: StructType => Seq(struct(mapStruct(dt, path.tail, Some(curColumn)): _*).as(field.name))
          case _ => throw new IllegalArgumentException(s"Field '${field.name}' is not a struct type.")
        }
      }

      val fieldName = path.head
      val isLeaf = path.lengthCompare(2) < 0

      val newColumns = schema.fields.flatMap(field => {
        // This is the original column (struct field) we want to process
        val curColumn = parentColumn match {
          case None => new Column(field.name)
          case Some(col) => col.getField(field.name).as(field.name)
        }

        if (field.name.compareToIgnoreCase(fieldName) != 0) {
          // Copy unrelated fields as they were
          Seq(curColumn)
        } else {
          // We have found a match
          if (isLeaf) {
            handleMatchedLeaf(field, curColumn)
          } else {
            handleMatchedNonLeaf(field, curColumn)
          }
        }
      })

      newColumns ++ mappedFields
    }

    val schema = df.schema
    val path = inputColumnName.split('.')
    df.select(mapStruct(schema, path): _*)
  }

  /**
    * This Spark Job demonstrates an issue of execution plan freezing when there are a lot of projections
    * involving nested structs in an execution plan.
    *
    * The example works as follows:
    * - A small dataframe is created from a JSON example above
    * - A nested withColumn map transformation is used to apply a transformation on a struct field and create
    *   a new struct field.
    * - Once more than 9 such transformations are applied the Catalyst optimizer freezes on optimizing
    *   the execution plan
    *
    */
  def main(args: Array[String]): Unit = {

    val sparkBuilder = SparkSession.builder().appName("Nested Projections Issue Example")
    val spark = sparkBuilder
      .master("local[4]")
      .getOrCreate()

    import spark.implicits._

    val dfInput = spark.read.json(sample.toDS)

    // Apply several negations. You can change the number of negations by changing the upper bound of the range up to 16
    val dfOutput = Range(1,12).foldLeft(dfInput)( (df, i) => {
      transformInsideNestedStruct(df, s"numerics.num$i", s"out_num$i", c => -c)
    })

    dfOutput.printSchema()
    dfOutput.explain(true)
    dfOutput.show(false)
  }

}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org