You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2018/01/26 00:19:00 UTC

[jira] [Resolved] (SPARK-23032) Add a per-query codegenStageId to WholeStageCodegenExec

     [ https://issues.apache.org/jira/browse/SPARK-23032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xiao Li resolved SPARK-23032.
-----------------------------
       Resolution: Fixed
    Fix Version/s: 2.3.0

> Add a per-query codegenStageId to WholeStageCodegenExec
> -------------------------------------------------------
>
>                 Key: SPARK-23032
>                 URL: https://issues.apache.org/jira/browse/SPARK-23032
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Kris Mok
>            Priority: Major
>             Fix For: 2.3.0
>
>
> Proposing to add a per-query ID to the codegen stages as represented by {{WholeStageCodegenExec}} operators. This ID will be used in
> * the explain output of the physical plan, and in
> * the generated class name.
> Specifically, this ID will be stable within a query, counting up from 1 in depth-first post-order for all the {{WholeStageCodegenExec}} inserted into a plan.
> The ID value 0 is reserved for "free-floating" {{WholeStageCodegenExec}} objects, which may have been created for one-off purposes, e.g. for fallback handling of codegen stages that failed to codegen the whole stage and wishes to codegen a subset of the children operators.
> Example: for the following query:
> {code:none}
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)
> scala> val df1 = spark.range(10).select('id as 'x, 'id + 1 as 'y).orderBy('x).select('x + 1 as 'z, 'y)
> df1: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint]
> scala> val df2 = spark.range(5)
> df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
> scala> val query = df1.join(df2, 'z === 'id)
> query: org.apache.spark.sql.DataFrame = [z: bigint, y: bigint ... 1 more field]
> {code}
> The explain output before the change is:
> {code:none}
> scala> query.explain
> == Physical Plan ==
> *SortMergeJoin [z#9L], [id#13L], Inner
> :- *Sort [z#9L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(z#9L, 200)
> :     +- *Project [(x#3L + 1) AS z#9L, y#4L]
> :        +- *Sort [x#3L ASC NULLS FIRST], true, 0
> :           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
> :              +- *Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
> :                 +- *Range (0, 10, step=1, splits=8)
> +- *Sort [id#13L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#13L, 200)
>       +- *Range (0, 5, step=1, splits=8)
> {code}
> Note how codegen'd operators are annotated with a prefix {{"*"}}.
> and after this change it'll be:
> {code:none}
> scala> query.explain
> == Physical Plan ==
> *(6) SortMergeJoin [z#9L], [id#13L], Inner
> :- *(3) Sort [z#9L ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(z#9L, 200)
> :     +- *(2) Project [(x#3L + 1) AS z#9L, y#4L]
> :        +- *(2) Sort [x#3L ASC NULLS FIRST], true, 0
> :           +- Exchange rangepartitioning(x#3L ASC NULLS FIRST, 200)
> :              +- *(1) Project [id#0L AS x#3L, (id#0L + 1) AS y#4L]
> :                 +- *(1) Range (0, 10, step=1, splits=8)
> +- *(5) Sort [id#13L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(id#13L, 200)
>       +- *(4) Range (0, 5, step=1, splits=8)
> {code}
> Note that the annotated prefix becomes {{"*(id) "}}
> It'll also show up in the name of the generated class, as a suffix in the format of
> {code:none}
> GeneratedClass$GeneratedIterator$id
> {code}
> for example, note how {{GeneratedClass$GeneratedIteratorForCodegenStage3}} and {{GeneratedClass$GeneratedIteratorForCodegenStage6}} in the following stack trace corresponds to the IDs shown in the explain output above:
> {code:none}
> "Executor task launch worker for task 424@12957" daemon prio=5 tid=0x58 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
> 	  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:109)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.sort_addToSorter$(generated.java:32)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(generated.java:41)
> 	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$9$$anon$1.hasNext(WholeStageCodegenExec.scala:494)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.findNextInnerJoinRows$(generated.java:42)
> 	  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:101)
> 	  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$2.hasNext(WholeStageCodegenExec.scala:513)
> 	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
> 	  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
> 	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
> 	  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:828)
> 	  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> 	  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> 	  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> 	  at org.apache.spark.scheduler.Task.run(Task.scala:109)
> 	  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> 	  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	  at java.lang.Thread.run(Thread.java:748)
> {code}
> Rationale:
> Right now, the codegen from Spark SQL lacks the means to differentiate between a couple of things:
> 1. It's hard to tell which physical operators are in the same WholeStageCodegen stage. Note that this "stage" is a separate notion from Spark's RDD execution stages; this one is only to delineate codegen units.
> There can be adjacent physical operators that are both codegen'd but are in separate codegen stages. Some of this is due to hacky implementation details, such as the case with SortMergeJoin and its Sort inputs -- they're hard coded to be split into separate stages although both are codegen'd.
> When printing out the explain output of the physical plan, you'd only see the codegen'd physical operators annotated with a preceding star ('*') but would have no way to figure out if they're in the same stage.
> 2. Performance/error diagnosis
> The generated code has class/method names that are hard to differentiate between queries or even between codegen stages within the same query. If we use a Java-level profiler to collect profiles, or if we encounter a Java-level exception with a stack trace in it, it's really hard to tell which part of a query it's at.
> By introducing a per-query codegen stage ID, we'd at least be able to know which codegen stage (and in turn, which group of physical operators) was a profile tick or an exception happened.
> The reason why this proposal uses a per-query ID is because it's stable within a query, so that multiple runs of the same query will see the same resulting IDs. This both benefits understandability for users, and also it plays well with the codegen cache in Spark SQL which uses the generated source code as the key.
> The downside to using per-query IDs as opposed to a per-session or globally incrementing ID is of course we can't tell apart different query runs with this ID alone. But for now I believe this is a good enough tradeoff.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org