You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2021/07/12 09:33:00 UTC

[jira] [Assigned] (SPARK-36087) An Impl of skew key detection and data inflation optimization

     [ https://issues.apache.org/jira/browse/SPARK-36087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-36087:
------------------------------------

    Assignee:     (was: Apache Spark)

> An Impl of skew key detection and data inflation optimization
> -------------------------------------------------------------
>
>                 Key: SPARK-36087
>                 URL: https://issues.apache.org/jira/browse/SPARK-36087
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: zhengruifeng
>            Priority: Major
>
> design doc: [https://docs.google.com/document/d/1jYGlipQdirhuRR3_x1PflAYdMow7StOHOLUjSw6Vk9s/edit?usp=sharing]
> 1, introduce {{ShuffleExecAccumulator}} in ShuffleExchangeExec to support arbitrary statistics;
> 2, impl a key sampling {{ShuffleExecAccumulator}} to detect skew keys and show debug info on SparkUI;
> 3, in {{OptimizeSkewedJoin}}, estimate the joined size of each partition based on the sampled keys, and split a partition if it is not split yet and its estimated joined size is too larger.
>  
> data inflation case:
> {code:java}
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "80")
> spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "80")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", "1")
> spark.conf.set("spark.sql.shuffle.partitions", 10)sc.setLogLevel("INFO")
> val df1 = spark.range(0, 1000000, 1, 9).select(col("id").as("key1"), col("id").as("value1"), hash(col("id")).mod(100).as("hash1")).withColumn("key1", when(col("hash1") === lit(0), lit(0)).otherwise(col("key1")))
> val df2 = spark.range(0, 500000, 1, 7).select(col("id").as("key2"), col("id").as("value2"), hash(col("id")).mod(100).as("hash2")).withColumn("key2", when(col("hash2") === lit(0), lit(0)).otherwise(col("key2")))
> df1.join(df2, col("key1") === col("key2")).write.mode("overwrite").parquet("/tmp/result_0")
> spark.conf.set("spark.sql.adaptive.skewJoin.inflation.enabled", true)
> spark.conf.set("spark.sql.adaptive.skewJoin.inflation.factor", 10)
> spark.conf.set("spark.sql.adaptive.shuffle.sampleSizePerPartition", 100)
> spark.conf.set("spark.sql.adaptive.shuffle.detectSkewness", true)
> df1.join(df2, col("key1") === col("key2")).write.mode("overwrite").parquet("/tmp/result_1"){code}
>  
> it also partially resolve https://issues.apache.org/jira/browse/SPARK-35596
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org