You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/26 13:59:20 UTC

[spark] branch branch-3.0 updated: [SPARK-31201][SQL] Add an individual config for skewed partition threshold

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8d0800a  [SPARK-31201][SQL] Add an individual config for skewed partition threshold
8d0800a is described below

commit 8d0800a0803d3c47938bddefa15328d654739bc5
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Mar 26 22:57:01 2020 +0900

    [SPARK-31201][SQL] Add an individual config for skewed partition threshold
    
    Skew join handling comes with an overhead: we need to read some data repeatedly. We should treat a partition as skewed if it's large enough so that it's beneficial to do so.
    
    Currently the size threshold is the advisory partition size, which is 64 MB by default. This is not large enough for the skewed partition size threshold.
    
    This PR adds a new config for the threshold and set default value as 256 MB.
    
    Avoid skew join handling that may introduce a  perf regression.
    
    no
    
    existing tests
    
    Closes #27967 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 05498af72e19b058b210815e1053f3fa9b0157d9)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 docs/sql-performance-tuning.md                                |  9 ++++++++-
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 ++++++++++-
 .../spark/sql/execution/adaptive/OptimizeSkewedJoin.scala     |  2 +-
 .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala |  2 +-
 4 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/docs/sql-performance-tuning.md b/docs/sql-performance-tuning.md
index 489575d..9a1cc89 100644
--- a/docs/sql-performance-tuning.md
+++ b/docs/sql-performance-tuning.md
@@ -242,7 +242,14 @@ Data skew can severely downgrade the performance of join queries. This feature d
        <td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
        <td>10</td>
        <td>
-         A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
+         A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.skewedPartitionThresholdInBytes</code>.
+       </td>
+     </tr>
+     <tr>
+       <td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
+       <td>256MB</td>
+       <td>
+         A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size. Ideally this config should be set larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
        </td>
      </tr>
    </table>
\ No newline at end of file
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1b00bed..c61a57e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -455,11 +455,20 @@ object SQLConf {
     buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
       .doc("A partition is considered as skewed if its size is larger than this factor " +
         "multiplying the median partition size and also larger than " +
-        s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
+        "'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'")
       .intConf
       .checkValue(_ > 0, "The skew factor must be positive.")
       .createWithDefault(10)
 
+  val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD =
+    buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
+      .doc("A partition is considered as skewed if its size in bytes is larger than this " +
+        s"threshold and also larger than '${SKEW_JOIN_SKEWED_PARTITION_FACTOR.key}' " +
+        "multiplying the median partition size. Ideally this config should be set larger " +
+        s"than '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("256MB")
+
   val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
     buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
       .internal()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index e02b9af..b09e563 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -67,7 +67,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
    */
   private def isSkewed(size: Long, medianSize: Long): Boolean = {
     size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
-      size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+      size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
   }
 
   private def medianSize(stats: MapOutputStatistics): Long = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index fcca23d..d4c5b0d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -615,6 +615,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000",
       SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
       withTempView("skewData1", "skewData2") {
         spark
@@ -781,4 +782,3 @@ class AdaptiveQueryExecSuite
     }
   }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org