You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2021/04/08 02:04:31 UTC

[spark] branch branch-3.0 updated: [SPARK-34922][SQL][3.0] Use a relative cost comparison function in the CBO

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b9ee41f  [SPARK-34922][SQL][3.0] Use a relative cost comparison function in the CBO
b9ee41f is described below

commit b9ee41fa9957631ca0f859ee928358c108fbd9a9
Author: Tanel Kiis <ta...@gmail.com>
AuthorDate: Thu Apr 8 11:03:59 2021 +0900

    [SPARK-34922][SQL][3.0] Use a relative cost comparison function in the CBO
    
    ### What changes were proposed in this pull request?
    
    Changed the cost comparison function of the CBO to use the ratios of row counts and sizes in bytes.
    
    ### Why are the changes needed?
    
    In #30965 we changed to CBO cost comparison function so it would be "symetric": `A.betterThan(B)` now implies, that `!B.betterThan(A)`.
    With that we caused a performance regressions in some queries - TPCDS q19 for example.
    
    The original cost comparison function used the ratios `relativeRows = A.rowCount / B.rowCount` and `relativeSize = A.size / B.size`. The changed function compared "absolute" cost values `costA = w*A.rowCount + (1-w)*A.size` and `costB = w*B.rowCount + (1-w)*B.size`.
    
    Given the input from wzhfy we decided to go back to the relative values, because otherwise one (size) may overwhelm the other (rowCount). But this time we avoid adding up the ratios.
    
    Originally `A.betterThan(B) => w*relativeRows + (1-w)*relativeSize < 1` was used. Besides being "non-symteric", this also can exhibit one overwhelming other.
    For `w=0.5` If `A` size (bytes) is at least 2x larger than `B`, then no matter how many times more rows does the `B` plan have, `B` will allways be considered to be better - `0.5*2 + 0.5*0.00000000000001 > 1`.
    
    When working with ratios, then it would be better to multiply them.
    The proposed cost comparison function is: `A.betterThan(B) => relativeRows^w  * relativeSize^(1-w) < 1`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Comparison of the changed TPCDS v1.4 query execution times at sf=10:
    
      | absolute | multiplicative |   | additive |  
    -- | -- | -- | -- | -- | --
    q12 | 145 | 137 | -5.52% | 141 | -2.76%
    q13 | 264 | 271 | 2.65% | 271 | 2.65%
    q17 | 4521 | 4243 | -6.15% | 4348 | -3.83%
    q18 | 758 | 466 | -38.52% | 480 | -36.68%
    q19 | 38503 | 2167 | -94.37% | 2176 | -94.35%
    q20 | 119 | 120 | 0.84% | 126 | 5.88%
    q24a | 16429 | 16838 | 2.49% | 17103 | 4.10%
    q24b | 16592 | 16999 | 2.45% | 17268 | 4.07%
    q25 | 3558 | 3556 | -0.06% | 3675 | 3.29%
    q33 | 362 | 361 | -0.28% | 380 | 4.97%
    q52 | 1020 | 1032 | 1.18% | 1052 | 3.14%
    q55 | 927 | 938 | 1.19% | 961 | 3.67%
    q72 | 24169 | 13377 | -44.65% | 24306 | 0.57%
    q81 | 1285 | 1185 | -7.78% | 1168 | -9.11%
    q91 | 324 | 336 | 3.70% | 337 | 4.01%
    q98 | 126 | 129 | 2.38% | 131 | 3.97%
    
    All times are in ms, the change is compared to the situation in the master branch (absolute).
    The proposed cost function (multiplicative) significantlly improves the performance on q18, q19 and q72. The original cost function (additive) has similar improvements at q18 and q19. All other chagnes are within the error bars and I would ignore them - perhaps q81 has also improved.
    
    ### How was this patch tested?
    
    PlanStabilitySuite
    
    Closes #32076 from tanelk/SPARK-34922_cbo_better_cost_function_3.0.
    
    Lead-authored-by: Tanel Kiis <ta...@gmail.com>
    Co-authored-by: tanel.kiis@gmail.com <ta...@gmail.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../catalyst/optimizer/CostBasedJoinReorder.scala  | 28 ++++++++++++++++++----
 .../org/apache/spark/sql/internal/SQLConf.scala    |  6 +++--
 .../sql/catalyst/optimizer/JoinReorderSuite.scala  |  3 ---
 .../optimizer/StarJoinCostBasedReorderSuite.scala  |  9 +++----
 4 files changed, 32 insertions(+), 14 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
index 93c608dc..ed7d92e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -343,12 +343,30 @@ object JoinReorderDP extends PredicateHelper with Logging {
       }
     }
 
+    /**
+     * To identify the plan with smaller computational cost,
+     * we use the weighted geometric mean of ratio of rows and the ratio of sizes in bytes.
+     *
+     * There are other ways to combine these values as a cost comparison function.
+     * Some of these, that we have experimented with, but have gotten worse result,
+     * than with the current one:
+     * 1) Weighted arithmetic mean of these two ratios - adding up fractions puts
+     * less emphasis on ratios between 0 and 1. Ratios 10 and 0.1 should be considered
+     * to be just as strong evidences in opposite directions. The arithmetic mean of these
+     * would be heavily biased towards the 10.
+     * 2) Absolute cost (cost = weight * rowCount + (1 - weight) * size) - when adding up
+     * two numeric measurements that have different units we can easily end up with one
+     * overwhelming the other.
+     */
     def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
-      val thisCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight +
-        BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
-      val otherCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight +
-        BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
-      thisCost < otherCost
+      if (other.planCost.card == 0 || other.planCost.size == 0) {
+        false
+      } else {
+        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
+        val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
+        Math.pow(relativeRows.doubleValue, conf.joinReorderCardWeight) *
+          Math.pow(relativeSize.doubleValue, 1 - conf.joinReorderCardWeight) < 1
+      }
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 85ac7b8..ce0459e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1664,8 +1664,10 @@ object SQLConf {
   val JOIN_REORDER_CARD_WEIGHT =
     buildConf("spark.sql.cbo.joinReorder.card.weight")
       .internal()
-      .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " +
-        "rows * weight + size * (1 - weight).")
+      .doc("The weight of the ratio of cardinalities (number of rows) " +
+        "in the cost comparison function. The ratio of sizes in bytes has weight " +
+        "1 - this value. The weighted geometric mean of these ratios is used to decide " +
+        "which of the candidate plans will be chosen by the CBO.")
       .version("2.2.0")
       .doubleConf
       .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 75fe3dd..5dca7de 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -391,9 +391,6 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     val plan1 = JoinPlan(null, null, null, Cost(300, 80))
     val plan2 = JoinPlan(null, null, null, Cost(500, 30))
 
-    // cost1 = 300*0.7 + 80*0.3 = 234
-    // cost2 = 500*0.7 + 30*0.3 = 359
-
     assert(!plan1.betterThan(plan1, conf))
     assert(!plan2.betterThan(plan2, conf))
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index b7cf383..391f67e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -293,12 +293,13 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
           (nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
 
     val expected =
-      t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
+      f1
+        .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
+        .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
+        .join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
+          Some(nameToAttr("d1_c2") === nameToAttr("t4_c1")))
         .join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
           Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
-        .join(f1
-          .join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
-          .join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
         .select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)
 
     assertEqualPlans(query, expected)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org