You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Herman van Hovell (JIRA)" <ji...@apache.org> on 2018/01/29 17:04:00 UTC

[jira] [Resolved] (SPARK-17006) WithColumn Performance Degrades with Number of Invocations

     [ https://issues.apache.org/jira/browse/SPARK-17006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Herman van Hovell resolved SPARK-17006.
---------------------------------------
       Resolution: Fixed
         Assignee: Herman van Hovell
    Fix Version/s: 2.3.0

> WithColumn Performance Degrades with Number of Invocations
> ----------------------------------------------------------
>
>                 Key: SPARK-17006
>                 URL: https://issues.apache.org/jira/browse/SPARK-17006
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Hamel Ajay Kothari
>            Assignee: Herman van Hovell
>            Priority: Major
>             Fix For: 2.3.0
>
>
> Consider the following test case. We create a dataframe with 100 withColumn statements, then 100 more, then 100 more, then 100 more. Each time we do this it gets slower pretty drastically. If we sub in the optimized plan, we end up with drastically better performance.
> Consider the following code:
> {code}
> val raw = sc.parallelize(Range(1, 100)).toDF
> val s1 = System.nanoTime()
> var mapped = Range(1, 100).foldLeft(raw) { (df, i) =>
>     df.withColumn(s"value${i}", df("value") + i)
> }
> val s2 = System.nanoTime()
> val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) =>
>     df.withColumn(s"value${i}_2", df("value") + i)
> }
> val s3 = System.nanoTime()
> val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) =>
>     df.withColumn(s"value${i}_3", df("value") + i)
> }
> val s4 = System.nanoTime()
> val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
>     df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s5 = System.nanoTime()
> val plan = mapped3.queryExecution.optimizedPlan
> val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan, org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
> val s6 = System.nanoTime()
> val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) =>
>     df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s7 = System.nanoTime()
> val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
>     df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s8 = System.nanoTime()
> val plan = mapped3.queryExecution.analyzed
> val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan, org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
> val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) =>
>     df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s9 = System.nanoTime()
> val secondsToNanos = 1000*1000*1000.0
> val stage1 = (s2-s1)/secondsToNanos
> val stage2 = (s3-s2)/secondsToNanos
> val stage3 = (s4-s3)/secondsToNanos
> val stage4 = (s5-s4)/secondsToNanos
> val stage5 = (s6-s5)/secondsToNanos
> val stage6 = (s7-s6)/secondsToNanos
> val stage7 = (s8-s7)/secondsToNanos
> val stage8 = (s9-s8)/secondsToNanos
> println(s"First 100: ${stage1}")
> println(s"Second 100: ${stage2}")
> println(s"Third 100: ${stage3}")
> println(s"Fourth 100: ${stage4}")
> println(s"Fourth 100 Optimization time: ${stage5}")
> println(s"Fourth 100 Optimized ${stage6}")
> println(s"Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed etc: ${stage7}")
> println(s"Fourth selects: ${stage8}")
> {code}
> This results in the following performance:
> {code}
> First 100: 4.873489454
> Second 100: 14.982028303 seconds
> Third 100: 38.775467952 seconds
> Fourth 100: 73.429119675 seconds
> Fourth 100 Optimization time: 1.777374175 seconds
> Fourth 100 Optimized 22.514489934 seconds
> Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed etc: 69.616112734 seconds
> Fourth 100 using analyzed plan: 67.641982709 seconds
> {code}
> Now, I suspect that we can't just sub in the optimized plan for the logical plan because we lose a bunch of information which may be useful for optimization later. But, I suspect there's something we can do in the case of Projects at least that might be useful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org