You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Srinivas Rishindra Pothireddi (Jira)" <ji...@apache.org> on 2021/02/04 19:30:01 UTC
[jira] [Created] (SPARK-34369) Track number of pairs processed out
of Join
Srinivas Rishindra Pothireddi created SPARK-34369:
-----------------------------------------------------
Summary: Track number of pairs processed out of Join
Key: SPARK-34369
URL: https://issues.apache.org/jira/browse/SPARK-34369
Project: Spark
Issue Type: New Feature
Components: Web UI
Affects Versions: 3.2.0
Reporter: Srinivas Rishindra Pothireddi
Often users face a scenario where even a modest skew in a join can lead to tasks appearing to be stuck, due to the O(n^2) nature of a join considering all pairs of rows with matching keys. When this happens users think that spark has gotten deadlocked. If there is a bound condition, the "number of output rows" metric may look typical. Other metrics may look very modest (eg: shuffle read). In those cases, it is very hard to understand what the problem is. There is no conclusive proof without getting a heap dump and looking at some internal data structures.
It would be much better if spark had a metric(which we propose be titled “number of matched pairs” as a companion to “number of output rows”) which showed the user how many pairs were being processed in the join. This would get updated in the live UI (when metrics get collected during heartbeats), so the user could easily see what was going on.
This would even help in cases where there was some other cause of a stuck executor (eg. network issues) just to disprove this theory. For example, you may have 100k records with the same key on each side of a join. That probably won't really show up as extreme skew in task input data. But it'll become 10B join pairs that spark works through, in one task.
To further demonstrate the usefulness of this metric please follow the steps below.
_val df1 = spark.range(0, 200000).map \{ x => (x % 20, 20) }.toDF("b", "c")_
_val df2 = spark.range(0, 300000).map \{ x => (77, 20) }.toDF("b", "c")_
_val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")_
_val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")_
_val df5 = df1.union(df2)_
_val df6 = df3.union(df4)_
_df5.createOrReplaceTempView("table1")_
_df6.createOrReplaceTempView("table2")_
h3. InnerJoin
_sql("select p.*, f.* from table2 p join table1 f on f.b = p.b and f.c > p.c").count_
_number of output rows: 5,580,000_
_number of matched pairs: 90,000,490,000_
h3. FullOuterJoin
_spark.sql("select p.*, f.* from table2 p full outer join table1 f on f.b = p.b and f.c > p.c").count_
_number of output rows: 6,099,964_
_number of matched pairs: 90,000,490,000_
h3. LeftOuterJoin
_sql("select p.*, f.* from table2 p left outer join table1 f on f.b = p.b and f.c > p.c").count_
_number of output rows: 6,079,964_
_number of matched pairs: 90,000,490,000_
h3. RightOuterJoin
_spark.sql("select p.*, f.* from table2 p right outer join table1 f on f.b = p.b and f.c > p.c").count_
_number of output rows: 5,600,000_
_number of matched pairs: 90,000,490,000_
h3. LeftSemiJoin
_spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and f.c > p.c").count_
_number of output rows: 36_
_number of matched pairs: 89,994,910,036_
h3. CrossJoin
_spark.sql("select p.*, f.* from table2 p cross join table1 f on f.b = p.b and f.c > p.c").count_
_number of output rows: 5,580,000_
_number of matched pairs: 90,000,490,000_
h3. LeftAntiJoin
_spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > p.c").count_
number of output rows: 499,964
number of matched pairs: 89,994,910,036
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org