You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/18 15:52:07 UTC

[GitHub] [spark] viirya opened a new pull request #24637: [SPARK-27707][SQL] Prune unnecessary nested fields from Generate

viirya opened a new pull request #24637: [SPARK-27707][SQL] Prune unnecessary nested fields from Generate
URL: https://github.com/apache/spark/pull/24637
 
 
   ## What changes were proposed in this pull request?
   
   Performance issue using explode was found when a complex field contains huge array is to get duplicated as the number of exploded array elements. Given example:
   
   ```scala
   val df = spark.sparkContext.parallelize(Seq(("1",
     Array.fill(M)({
       val i = math.random
       (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString)
     })))).toDF("col", "arr")
     .selectExpr("col", "struct(col, arr) as st")
     .selectExpr("col", "st.col as col1", "explode(st.arr) as arr_col")
   ```
   
   The explode causes `st` to be duplicated as many as the exploded elements.
   
   Benchmarks it:
   
   ```
   [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
   [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
   [info] generate big nested struct array:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
   [info] ------------------------------------------------------------------------------------------------------------------------
   [info] generate big nested struct array wholestage off          52668          53162         699          0.0      877803.4       1.0X
   [info] generate big nested struct array wholestage on          47261          49093        1125          0.0      787690.2       1.1X
   [info]
   ```
   
   The query plan:
   ```
   == Physical Plan ==
    Project [col#508, st#512.col AS col1#515, arr_col#519]
    +- Generate explode(st#512.arr), [col#508, st#512], false, [arr_col#519]
       +- Project [_1#503 AS col#508, named_struct(col, _1#503, arr, _2#504) AS st#512]
          +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,                                     knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#503, mapobjects(MapObjects_loopValue84, MapObjects_loopIsNull84,      ObjectType(class scala.Tuple4), if (isnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true)))     null else named_struct(_1, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,                                        knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._1, true, false), _2,              staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84,               MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._2, true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,     StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84, MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._3, true,  false), _4, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue84,   MapObjects_loopIsNull84, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2,       None) AS _2#504]
             +- Scan[obj#534]
   ```
   
   This patch takes nested column pruning approach to prune unnecessary nested fields. It adds a projection of the needed nested fields as aliases on the child of `Generate`, and substitutes them by alias attributes on the projection on top of `Generate`.
   
   Benchmarks it after the change:
   ```
    [info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_202-b08 on Mac OS X 10.14.4
    [info] Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
    [info] generate big nested struct array:         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    [info] ------------------------------------------------------------------------------------------------------------------------
    [info] generate big nested struct array wholestage off            311            331          28          0.2        5188.6       1.0X
    [info] generate big nested struct array wholestage on            297            312          15          0.2        4947.3       1.0X
    [info]
   ```
   
   The query plan:
   ```
   == Physical Plan ==
    Project [col#592, _gen_alias_608#608 AS col1#599, arr_col#603]
    +- Generate explode(st#596.arr), [col#592, _gen_alias_608#608], false, [arr_col#603]
       +- Project [_1#587 AS col#592, named_struct(col, _1#587, arr, _2#588) AS st#596, _1#587 AS _gen_alias_608#608]
          +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(in
    put[0, scala.Tuple2, true]))._1, true, false) AS _1#587, mapobjects(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4),
    if (isnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))) null else named_struct(_1,        staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102,              MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._1, true, false), _2, staticinvoke(class org.apache.spark.unsafe.types.UTF8String,    StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._2,      true, false), _3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,                                                 knownnotnull(lambdavariable(MapObjects_loopValue102, MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._3, true, false), _4,            staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(lambdavariable(MapObjects_loopValue102,              MapObjects_loopIsNull102, ObjectType(class scala.Tuple4), true))._4, true, false)), knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2,      None) AS _2#588]
             +- Scan[obj#586]
   ```
   
   ## How was this patch tested?
   
   Added benchmark.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org