You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Srinivas Rishindra Pothireddi (Jira)" <ji...@apache.org> on 2021/02/04 19:31:00 UTC

[jira] [Commented] (SPARK-34369) Track number of pairs processed out of Join

    [ https://issues.apache.org/jira/browse/SPARK-34369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279098#comment-17279098 ] 

Srinivas Rishindra Pothireddi commented on SPARK-34369:
-------------------------------------------------------

I am working on this

> Track number of pairs processed out of Join
> -------------------------------------------
>
>                 Key: SPARK-34369
>                 URL: https://issues.apache.org/jira/browse/SPARK-34369
>             Project: Spark
>          Issue Type: New Feature
>          Components: Web UI
>    Affects Versions: 3.2.0
>            Reporter: Srinivas Rishindra Pothireddi
>            Priority: Major
>
> Often users face a scenario where even a modest skew in a join can lead to tasks appearing to be stuck, due to the O(n^2) nature of a join considering all pairs of rows with matching keys. When this happens users think that spark has gotten deadlocked. If there is a bound condition, the "number of output rows" metric may look typical. Other metrics may look very modest (eg: shuffle read). In those cases, it is very hard to understand what the problem is. There is no conclusive proof without getting a heap dump and looking at some internal data structures.
> It would be much better if spark had a metric(which we propose be titled “number of matched pairs” as a companion to “number of output rows”) which showed the user how many pairs were being processed in the join. This would get updated in the live UI (when metrics get collected during heartbeats), so the user could easily see what was going on.
> This would even help in cases where there was some other cause of a stuck executor (eg. network issues) just to disprove this theory. For example, you may have 100k records with the same key on each side of a join. That probably won't really show up as extreme skew in task input data. But it'll become 10B join pairs that spark works through, in one task.
>  
> To further demonstrate the usefulness of this metric please follow the steps below.
>  
>     _val df1 = spark.range(0, 200000).map \{ x => (x % 20, 20) }.toDF("b", "c")_
>     _val df2 = spark.range(0, 300000).map \{ x => (77, 20) }.toDF("b", "c")_
>  
>     _val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")_
>     _val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")_
>  
>     _val df5 = df1.union(df2)_
>     _val df6 = df3.union(df4)_
>  
>     _df5.createOrReplaceTempView("table1")_
>     _df6.createOrReplaceTempView("table2")_
> h3. InnerJoin
> _sql("select p.*, f.* from table2 p join table1 f on f.b = p.b and f.c > p.c").count_
> _number of output rows: 5,580,000_
> _number of matched pairs: 90,000,490,000_
> h3. FullOuterJoin
> _spark.sql("select p.*, f.* from table2 p full outer join table1 f on f.b = p.b and f.c > p.c").count_
> _number of output rows: 6,099,964_
> _number of matched pairs: 90,000,490,000_
> h3. LeftOuterJoin
> _sql("select p.*, f.* from table2 p left outer join table1 f on f.b = p.b and f.c > p.c").count_
> _number of output rows: 6,079,964_
> _number of matched pairs: 90,000,490,000_
> h3. RightOuterJoin
> _spark.sql("select p.*, f.* from table2 p right outer join table1 f on f.b = p.b and f.c > p.c").count_
> _number of output rows: 5,600,000_
> _number of matched pairs: 90,000,490,000_
> h3. LeftSemiJoin
> _spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and f.c > p.c").count_
> _number of output rows: 36_
> _number of matched pairs: 89,994,910,036_
> h3. CrossJoin
> _spark.sql("select p.*, f.* from table2 p cross join table1 f on f.b = p.b and f.c > p.c").count_
> _number of output rows: 5,580,000_
> _number of matched pairs: 90,000,490,000_
> h3. LeftAntiJoin
> _spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > p.c").count_
> number of output rows: 499,964
> number of matched pairs: 89,994,910,036



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org