You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wzhfy <gi...@git.apache.org> on 2017/10/19 03:07:59 UTC

[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

GitHub user wzhfy opened a pull request:

    https://github.com/apache/spark/pull/19531

    [SPARK-22310] [SQL] Refactor join estimation to incorporate estimation logic for different kinds of statistics

    ## What changes were proposed in this pull request?
    
    The current join estimation logic is only based on basic column statistics (such as ndv, etc). If we want to add estimation for other kinds of statistics (such as histograms), it's not easy to incorporate into the current algorithm:
    1. When we have multiple pairs of join keys, the current algorithm computes cardinality in a single formula. But if different join keys have different kinds of stats, the computation logic for each pair of join keys become different, so the previous formula does not apply.
    2. Currently it computes cardinality and updates join keys' column stats separately. It's better to do these two steps together, since both computation and update logic are different for different kinds of stats.
    
    ## How was this patch tested?
    
    Only refactor, covered by existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wzhfy/spark join_est_refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19531.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19531
    
----
commit b30de470a11ca3f360260a8a36bc1e5eb4f355e8
Author: Zhenhua Wang <wa...@huawei.com>
Date:   2017-10-19T02:45:53Z

    refactor

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147529424
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    +    while(i < joinKeyPairs.length && minCard != 0) {
           val (leftKey, rightKey) = joinKeyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val card = joinCardByNdv(leftKey, rightKey, newMin, newMax)
    +        // Return cardinality estimated from the most selective join keys.
    +        if (card < minCard) minCard = card
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        minCard = 0
           }
           i += 1
         }
    +    minCard
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    +  private def joinCardByNdv(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): BigInt = {
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
    +    // Compute cardinality by the basic formula.
    +    val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
    +
    +    // Update intersected column stats.
    +    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    +
    +    join.joinType match {
    +      case LeftOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey,
    +          ColumnStat(newNdv, newMin, newMax, rightKeyStat.nullCount, newAvgLen, newMaxLen))
    +      case RightOuter =>
    +        keyStatsAfterJoin.put(leftKey,
    +          ColumnStat(newNdv, newMin, newMax, leftKeyStat.nullCount, newAvgLen, newMaxLen))
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case FullOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case _ =>
    +        val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
    +        keyStatsAfterJoin.put(leftKey, newStats)
    +        keyStatsAfterJoin.put(rightKey, newStats)
    --- End diff --
    
    The above code changes are new, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147467147
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    --- End diff --
    
    `minCard ` -> `cardinality`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147667111
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +157,90 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
    +   *
    +   * @param keyPairs pairs of join keys
    +   *
    +   * @return join cardinality, and column stats for join keys after the join
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def computeCardinalityAndStats(keyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : (BigInt, Map[Attribute, ColumnStat]) = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var cardJoin: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    +    val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    -      val (leftKey, rightKey) = joinKeyPairs(i)
    +    while(i < keyPairs.length && cardJoin != 0) {
    +      val (leftKey, rightKey) = keyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val (cardKeyPair, joinStatsKeyPair) = computeByNdv(leftKey, rightKey, newMin, newMax)
    +        keyStatsAfterJoin ++= joinStatsKeyPair
    +        // Return cardinality estimated from the most selective join keys.
    +        if (cardKeyPair < cardJoin) cardJoin = cardKeyPair
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        cardJoin = 0
           }
           i += 1
         }
    +    (cardJoin, keyStatsAfterJoin.toMap)
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    -    }
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    +  private def computeByNdv(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): (BigInt, Map[Attribute, ColumnStat]) = {
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
    +    // Compute cardinality by the basic formula.
    +    val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
    +
    +    // Update intersected column stats.
    +    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    +    val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
    +
    +    (ceil(card), Map(leftKey -> newStats, rightKey -> newStats))
       }
     
       /**
        * Propagate or update column stats for output attributes.
        */
    -  private def updateAttrStats(
    +  private def updateOutputStats(
           outputRows: BigInt,
    -      attributes: Seq[Attribute],
    +      output: Seq[Attribute],
           oldAttrStats: AttributeMap[ColumnStat],
    -      joinKeyStats: AttributeMap[ColumnStat]): Seq[(Attribute, ColumnStat)] = {
    +      keyStatsAfterJoin: Map[Attribute, ColumnStat]): Seq[(Attribute, ColumnStat)] = {
    --- End diff --
    
    we should still use an `AttributeMap` here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #83159 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83159/testReport)** for PR 19531 at commit [`18cb42f`](https://github.com/apache/spark/commit/18cb42f84736c00f1ae3b7453ae5ff2f0c823484).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147467535
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    +    while(i < joinKeyPairs.length && minCard != 0) {
           val (leftKey, rightKey) = joinKeyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val card = joinCardByNdv(leftKey, rightKey, newMin, newMax)
    --- End diff --
    
    cardKeyPair


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #82899 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82899/testReport)** for PR 19531 at commit [`b30de47`](https://github.com/apache/spark/commit/b30de470a11ca3f360260a8a36bc1e5eb4f355e8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class JoinEstimation(join: Join) extends Logging `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #83252 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83252/testReport)** for PR 19531 at commit [`a2dbb8e`](https://github.com/apache/spark/commit/a2dbb8eefacf560ff5e7090baa3796b2353daed2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147547985
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -28,45 +28,43 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics
     import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
     
     
    -object JoinEstimation extends Logging {
    +case class JoinEstimation(join: Join) extends Logging {
    +
    +  private val leftStats = join.left.stats
    +  private val rightStats = join.right.stats
    +  private val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
    +
       /**
        * Estimate statistics after join. Return `None` if the join type is not supported, or we don't
        * have enough statistics for estimation.
        */
    -  def estimate(join: Join): Option[Statistics] = {
    +  def estimate: Option[Statistics] = {
         join.joinType match {
           case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
    -        InnerOuterEstimation(join).doEstimate()
    +        estimateInnerOuterJoin()
           case LeftSemi | LeftAnti =>
    -        LeftSemiAntiEstimation(join).doEstimate()
    +        estimateLeftSemiAntiJoin()
           case _ =>
             logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
             None
         }
       }
    -}
    -
    -case class InnerOuterEstimation(join: Join) extends Logging {
    -
    -  private val leftStats = join.left.stats
    -  private val rightStats = join.right.stats
     
       /**
        * Estimate output size and number of rows after a join operator, and update output column stats.
        */
    -  def doEstimate(): Option[Statistics] = join match {
    +  private def estimateInnerOuterJoin(): Option[Statistics] = join match {
         case _ if !rowCountsExist(join.left, join.right) =>
           None
     
         case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, _) =>
           // 1. Compute join selectivity
           val joinKeyPairs = extractJoinKeysWithColStats(leftKeys, rightKeys)
    -      val selectivity = joinSelectivity(joinKeyPairs)
    +      val innerJoinedRows = joinCardinality(joinKeyPairs)
    --- End diff --
    
    Thanks, I'll make the changes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147466493
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -28,45 +28,43 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics
     import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
     
     
    -object JoinEstimation extends Logging {
    +case class JoinEstimation(join: Join) extends Logging {
    +
    +  private val leftStats = join.left.stats
    +  private val rightStats = join.right.stats
    +  private val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
    +
       /**
        * Estimate statistics after join. Return `None` if the join type is not supported, or we don't
        * have enough statistics for estimation.
        */
    -  def estimate(join: Join): Option[Statistics] = {
    +  def estimate: Option[Statistics] = {
         join.joinType match {
           case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
    -        InnerOuterEstimation(join).doEstimate()
    +        estimateInnerOuterJoin()
           case LeftSemi | LeftAnti =>
    -        LeftSemiAntiEstimation(join).doEstimate()
    +        estimateLeftSemiAntiJoin()
           case _ =>
             logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
             None
         }
       }
    -}
    -
    -case class InnerOuterEstimation(join: Join) extends Logging {
    -
    -  private val leftStats = join.left.stats
    -  private val rightStats = join.right.stats
     
       /**
        * Estimate output size and number of rows after a join operator, and update output column stats.
        */
    -  def doEstimate(): Option[Statistics] = join match {
    +  private def estimateInnerOuterJoin(): Option[Statistics] = join match {
         case _ if !rowCountsExist(join.left, join.right) =>
           None
     
         case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, _) =>
           // 1. Compute join selectivity
           val joinKeyPairs = extractJoinKeysWithColStats(leftKeys, rightKeys)
    -      val selectivity = joinSelectivity(joinKeyPairs)
    +      val innerJoinedRows = joinCardinality(joinKeyPairs)
    --- End diff --
    
    `innerJoinedRows` -> `numInnerJoinedRows`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19531


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #83252 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83252/testReport)** for PR 19531 at commit [`a2dbb8e`](https://github.com/apache/spark/commit/a2dbb8eefacf560ff5e7090baa3796b2353daed2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    LGTM, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #83159 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83159/testReport)** for PR 19531 at commit [`18cb42f`](https://github.com/apache/spark/commit/18cb42f84736c00f1ae3b7453ae5ff2f0c823484).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147544011
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    +    while(i < joinKeyPairs.length && minCard != 0) {
           val (leftKey, rightKey) = joinKeyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val card = joinCardByNdv(leftKey, rightKey, newMin, newMax)
    +        // Return cardinality estimated from the most selective join keys.
    +        if (card < minCard) minCard = card
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        minCard = 0
           }
           i += 1
         }
    +    minCard
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    +  private def joinCardByNdv(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): BigInt = {
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
    +    // Compute cardinality by the basic formula.
    +    val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
    +
    +    // Update intersected column stats.
    +    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    +
    +    join.joinType match {
    +      case LeftOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey,
    +          ColumnStat(newNdv, newMin, newMax, rightKeyStat.nullCount, newAvgLen, newMaxLen))
    +      case RightOuter =>
    +        keyStatsAfterJoin.put(leftKey,
    +          ColumnStat(newNdv, newMin, newMax, leftKeyStat.nullCount, newAvgLen, newMaxLen))
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case FullOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case _ =>
    +        val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
    +        keyStatsAfterJoin.put(leftKey, newStats)
    +        keyStatsAfterJoin.put(rightKey, newStats)
    --- End diff --
    
    They are a part of the old method [getIntersectedStats](https://github.com/apache/spark/pull/19531/files#diff-6387e7aaeb7d8e0cb1457b9d0fe5cd00L235). I need to revert those lines for outer join cases, thanks for pointing it out.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83159/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r146993530
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    --- End diff --
    
    This is different from the previous logics, right? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #83153 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83153/testReport)** for PR 19531 at commit [`18edc14`](https://github.com/apache/spark/commit/18edc1471d9bcfe6bb500afa77c6d9c1a4bd23dc).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #82899 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82899/testReport)** for PR 19531 at commit [`b30de47`](https://github.com/apache/spark/commit/b30de470a11ca3f360260a8a36bc1e5eb4f355e8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147027199
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    --- End diff --
    
    It's equivalent. Previously if `ndvDenom` is -1 (no qualified join keys), the method returns 1 as selectivity, then the join cardinality computed outside the method is also `leftStats.rowCount.get * rightStats.rowCount.get`. Here I changed the method's returned values from selectivity to cardinality directly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147530024
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    +    while(i < joinKeyPairs.length && minCard != 0) {
           val (leftKey, rightKey) = joinKeyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val card = joinCardByNdv(leftKey, rightKey, newMin, newMax)
    +        // Return cardinality estimated from the most selective join keys.
    +        if (card < minCard) minCard = card
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        minCard = 0
           }
           i += 1
         }
    +    minCard
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    +  private def joinCardByNdv(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): BigInt = {
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
    +    // Compute cardinality by the basic formula.
    +    val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
    +
    +    // Update intersected column stats.
    +    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    +
    +    join.joinType match {
    +      case LeftOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey,
    +          ColumnStat(newNdv, newMin, newMax, rightKeyStat.nullCount, newAvgLen, newMaxLen))
    +      case RightOuter =>
    +        keyStatsAfterJoin.put(leftKey,
    +          ColumnStat(newNdv, newMin, newMax, leftKeyStat.nullCount, newAvgLen, newMaxLen))
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case FullOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case _ =>
    --- End diff --
    
    It would be better to explicitly specify all the join types here, if we want to update the key stats.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147665380
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +157,90 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
    +   *
    +   * @param keyPairs pairs of join keys
    +   *
    +   * @return join cardinality, and column stats for join keys after the join
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def computeCardinalityAndStats(keyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : (BigInt, Map[Attribute, ColumnStat]) = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var cardJoin: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    --- End diff --
    
    nit: `cardJoin` -> `joinCard`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82899/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147666360
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +157,90 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
    +   *
    +   * @param keyPairs pairs of join keys
    +   *
    +   * @return join cardinality, and column stats for join keys after the join
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def computeCardinalityAndStats(keyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : (BigInt, Map[Attribute, ColumnStat]) = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var cardJoin: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    +    val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    -      val (leftKey, rightKey) = joinKeyPairs(i)
    +    while(i < keyPairs.length && cardJoin != 0) {
    +      val (leftKey, rightKey) = keyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val (cardKeyPair, joinStatsKeyPair) = computeByNdv(leftKey, rightKey, newMin, newMax)
    +        keyStatsAfterJoin ++= joinStatsKeyPair
    +        // Return cardinality estimated from the most selective join keys.
    +        if (cardKeyPair < cardJoin) cardJoin = cardKeyPair
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        cardJoin = 0
           }
           i += 1
         }
    +    (cardJoin, keyStatsAfterJoin.toMap)
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    -    }
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    +  private def computeByNdv(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): (BigInt, Map[Attribute, ColumnStat]) = {
    --- End diff --
    
    I think we should return `(BigInt, ColumnStat)`, which means the column stats of the join key. Left and right keys must have same stats.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    cc @cloud-fan @gatorsmile @ron8hu 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    **[Test build #83153 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83153/testReport)** for PR 19531 at commit [`18edc14`](https://github.com/apache/spark/commit/18edc1471d9bcfe6bb500afa77c6d9c1a4bd23dc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83252/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147529228
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -28,45 +28,43 @@ import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics
     import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
     
     
    -object JoinEstimation extends Logging {
    +case class JoinEstimation(join: Join) extends Logging {
    +
    +  private val leftStats = join.left.stats
    +  private val rightStats = join.right.stats
    +  private val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
    +
       /**
        * Estimate statistics after join. Return `None` if the join type is not supported, or we don't
        * have enough statistics for estimation.
        */
    -  def estimate(join: Join): Option[Statistics] = {
    +  def estimate: Option[Statistics] = {
         join.joinType match {
           case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
    -        InnerOuterEstimation(join).doEstimate()
    +        estimateInnerOuterJoin()
           case LeftSemi | LeftAnti =>
    -        LeftSemiAntiEstimation(join).doEstimate()
    +        estimateLeftSemiAntiJoin()
           case _ =>
             logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
             None
         }
       }
    -}
    -
    -case class InnerOuterEstimation(join: Join) extends Logging {
    -
    -  private val leftStats = join.left.stats
    -  private val rightStats = join.right.stats
     
       /**
        * Estimate output size and number of rows after a join operator, and update output column stats.
        */
    -  def doEstimate(): Option[Statistics] = join match {
    +  private def estimateInnerOuterJoin(): Option[Statistics] = join match {
         case _ if !rowCountsExist(join.left, join.right) =>
           None
     
         case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, _) =>
           // 1. Compute join selectivity
           val joinKeyPairs = extractJoinKeysWithColStats(leftKeys, rightKeys)
    -      val selectivity = joinSelectivity(joinKeyPairs)
    +      val innerJoinedRows = joinCardinality(joinKeyPairs)
    --- End diff --
    
    Also returns `keyStatsAfterJoin`? If we make it global, we might introduce bugs if use the `keyStatsAfterJoin ` before calling `joinCardinality `. BTW, `joinCardinality ` is confusing. We might not realize `keyStatsAfterJoin `  is updated in `joinCardinality `  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147466305
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    --- End diff --
    
    I see. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    This PR is only for refactoring? If not, please add the test cases
    
    cc @bogdanrdc @cloud-fan @juliuszsompolski 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147666084
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +157,90 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
    +   *
    +   * @param keyPairs pairs of join keys
    +   *
    +   * @return join cardinality, and column stats for join keys after the join
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def computeCardinalityAndStats(keyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : (BigInt, Map[Attribute, ColumnStat]) = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var cardJoin: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    +    val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    -      val (leftKey, rightKey) = joinKeyPairs(i)
    +    while(i < keyPairs.length && cardJoin != 0) {
    +      val (leftKey, rightKey) = keyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val (cardKeyPair, joinStatsKeyPair) = computeByNdv(leftKey, rightKey, newMin, newMax)
    +        keyStatsAfterJoin ++= joinStatsKeyPair
    +        // Return cardinality estimated from the most selective join keys.
    +        if (cardKeyPair < cardJoin) cardJoin = cardKeyPair
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        cardJoin = 0
           }
           i += 1
         }
    +    (cardJoin, keyStatsAfterJoin.toMap)
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    -    }
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    --- End diff --
    
    it doesn't update the column stats, but return the column stats for the join key.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147548424
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    +    while(i < joinKeyPairs.length && minCard != 0) {
           val (leftKey, rightKey) = joinKeyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val card = joinCardByNdv(leftKey, rightKey, newMin, newMax)
    +        // Return cardinality estimated from the most selective join keys.
    +        if (card < minCard) minCard = card
           } else {
    -        // Set ndvDenom to zero to indicate that this join should have no output
    -        ndvDenom = 0
    +        // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    +        minCard = 0
           }
           i += 1
         }
    +    minCard
    +  }
     
    -    if (ndvDenom < 0) {
    -      // We can't find any join key pairs with column stats, estimate it as cartesian join.
    -      1
    -    } else if (ndvDenom == 0) {
    -      // One of the join key pairs is disjoint, thus the two sides of join is disjoint.
    -      0
    -    } else {
    -      1 / BigDecimal(ndvDenom)
    +  /** Compute join cardinality using the basic formula, and update column stats for join keys. */
    +  private def joinCardByNdv(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): BigInt = {
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
    +    // Compute cardinality by the basic formula.
    +    val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
    +
    +    // Update intersected column stats.
    +    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    +
    +    join.joinType match {
    +      case LeftOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey,
    +          ColumnStat(newNdv, newMin, newMax, rightKeyStat.nullCount, newAvgLen, newMaxLen))
    +      case RightOuter =>
    +        keyStatsAfterJoin.put(leftKey,
    +          ColumnStat(newNdv, newMin, newMax, leftKeyStat.nullCount, newAvgLen, newMaxLen))
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case FullOuter =>
    +        keyStatsAfterJoin.put(leftKey, leftKeyStat)
    +        keyStatsAfterJoin.put(rightKey, rightKeyStat)
    +      case _ =>
    --- End diff --
    
    Join type is limited at the entry point `def estimate`, so I add an assert here for clarity.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147468335
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +154,100 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def joinCardinality(joinKeyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : BigInt = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var minCard: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    +    while(i < joinKeyPairs.length && minCard != 0) {
           val (leftKey, rightKey) = joinKeyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    --- End diff --
    
    revert back the changes in the above four lines?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19531: [SPARK-22310] [SQL] Refactor join estimation to incorpor...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19531
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83153/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19531: [SPARK-22310] [SQL] Refactor join estimation to i...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19531#discussion_r147666502
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -157,64 +157,90 @@ case class InnerOuterEstimation(join: Join) extends Logging {
       // scalastyle:off
       /**
        * The number of rows of A inner join B on A.k1 = B.k1 is estimated by this basic formula:
    -   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)), where V is the number of distinct values of
    -   * that column. The underlying assumption for this formula is: each value of the smaller domain
    -   * is included in the larger domain.
    -   * Generally, inner join with multiple join keys can also be estimated based on the above
    -   * formula:
    +   * T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)),
    +   * where V is the number of distinct values (ndv) of that column. The underlying assumption for
    +   * this formula is: each value of the smaller domain is included in the larger domain.
    +   *
    +   * Generally, inner join with multiple join keys can be estimated based on the above formula:
        * T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
        * However, the denominator can become very large and excessively reduce the result, so we use a
        * conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
    +   *
    +   * That is, join estimation is based on the most selective join keys. We follow this strategy
    +   * when different types of column statistics are available. E.g., if card1 is the cardinality
    +   * estimated by ndv of join key A.k1 and B.k1, card2 is the cardinality estimated by histograms
    +   * of join key A.k2 and B.k2, then the result cardinality would be min(card1, card2).
    +   *
    +   * @param keyPairs pairs of join keys
    +   *
    +   * @return join cardinality, and column stats for join keys after the join
        */
       // scalastyle:on
    -  def joinSelectivity(joinKeyPairs: Seq[(AttributeReference, AttributeReference)]): BigDecimal = {
    -    var ndvDenom: BigInt = -1
    +  private def computeCardinalityAndStats(keyPairs: Seq[(AttributeReference, AttributeReference)])
    +    : (BigInt, Map[Attribute, ColumnStat]) = {
    +    // If there's no column stats available for join keys, estimate as cartesian product.
    +    var cardJoin: BigInt = leftStats.rowCount.get * rightStats.rowCount.get
    +    val keyStatsAfterJoin = new mutable.HashMap[Attribute, ColumnStat]()
         var i = 0
    -    while(i < joinKeyPairs.length && ndvDenom != 0) {
    -      val (leftKey, rightKey) = joinKeyPairs(i)
    +    while(i < keyPairs.length && cardJoin != 0) {
    +      val (leftKey, rightKey) = keyPairs(i)
           // Check if the two sides are disjoint
    -      val leftKeyStats = leftStats.attributeStats(leftKey)
    -      val rightKeyStats = rightStats.attributeStats(rightKey)
    -      val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
    -      val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
    +      val leftKeyStat = leftStats.attributeStats(leftKey)
    +      val rightKeyStat = rightStats.attributeStats(rightKey)
    +      val lInterval = ValueInterval(leftKeyStat.min, leftKeyStat.max, leftKey.dataType)
    +      val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
    -        // Get the largest ndv among pairs of join keys
    -        val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
    -        if (maxNdv > ndvDenom) ndvDenom = maxNdv
    +        val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    +        val (cardKeyPair, joinStatsKeyPair) = computeByNdv(leftKey, rightKey, newMin, newMax)
    --- End diff --
    
    `cardKeyPair` is a weird name.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org