You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wzhfy <gi...@git.apache.org> on 2017/10/28 01:54:05 UTC

[GitHub] spark pull request #19594: [WIP] [SPARK-21984] Join estimation based on equi...

GitHub user wzhfy opened a pull request:

    https://github.com/apache/spark/pull/19594

    [WIP] [SPARK-21984] Join estimation based on equi-height histogram

    ## What changes were proposed in this pull request?
    
    This PR depends on other two PRs: [SPARK-17074](https://github.com/apache/spark/pull/19531) and [SPARK-22310](https://github.com/apache/spark/pull/19479). I'll update this one after those two are resolved.
    
    ## How was this patch tested?
    Added new test cases.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wzhfy/spark join_estimation_histogram

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19594.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19594
    
----
commit aaa0f77267622757a31bf833e3d3943b7a4faf39
Author: Zhenhua Wang <wa...@huawei.com>
Date:   2017-10-23T02:57:58Z

    tmp_generate_histogram

commit 3521cd8c8a5e46a1f5ec9a69a6dad61355b4181b
Author: Zhenhua Wang <wa...@huawei.com>
Date:   2017-10-23T02:59:02Z

    tmp_join_est_refactor

commit 67bd65153bd0afcdddd30c6ef4799caa02a05a19
Author: Zhenhua Wang <wa...@huawei.com>
Date:   2017-10-28T01:30:57Z

    estimation using equi-height histogram

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157699245
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          // Histograms are propagated as unchanged. During future estimation, they should be
    +          // truncated by the updated max/min. In this way, only pointers of the histograms are
    +          // propagated and thus reduce memory consumption.
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    I put it here because `computeByEquiHeightHistogram` returns a single stats, here we keep the histogram for leftKey and rightKey respectively.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157514899
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -212,4 +213,186 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range
    +   * [lowerBound, upperBound].
    +   */
    +  def getOverlappedRanges(
    +    leftHistogram: Histogram,
    +    rightHistogram: Histogram,
    +    lowerBound: Double,
    +    upperBound: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [lowerBound, upperBound] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (left.hi == right.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = right.lo,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +              val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = left.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo <= left.lo && right.hi <= left.hi) {
    +            // Case4: the left bin is "larger" than the right bin
    +            //      right.lo           left.lo      right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (right.hi == left.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.hi,
    +                hi = right.hi,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
    +              val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = left.lo,
    +                hi = right.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo >= left.lo && right.hi <= left.hi) {
    +            // Case5: the left bin contains the right bin
    +            //      left.lo            right.lo     right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - right.lo) / (left.hi - left.lo)
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.hi,
    +              leftNdv = left.ndv * leftRatio,
    +              rightNdv = right.ndv,
    +              leftNumRows = leftHeight * leftRatio,
    +              rightNumRows = rightHeight
    +            )
    +          } else {
    +            assert(right.lo <= left.lo && right.hi >= left.hi)
    +            // Case6: the right bin contains the left bin
    +            //      right.lo           left.lo      left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val rightRatio = (left.hi - left.lo) / (right.hi - right.lo)
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.hi,
    +              leftNdv = left.ndv,
    +              rightNdv = right.ndv * rightRatio,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight * rightRatio
    +            )
    +          }
    +          overlappedRanges += range
    +        }
    +      }
    +    }
    +    overlappedRanges
    +  }
    +
    +  /**
    +   * Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part
    +   * of the bin in that range and its number of rows.
    +   */
    +  def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double)
    +  : (HistogramBin, Double) = {
    +    val (lo, hi) = if (bin.lo <= lowerBound && bin.hi >= upperBound) {
    +      //       bin.lo          lowerBound     upperBound      bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (lowerBound, upperBound)
    +    } else if (bin.lo <= lowerBound && bin.hi >= lowerBound) {
    +      //       bin.lo          lowerBound      bin.hi      upperBound
    +      // --------+------------------+------------+-------------+------->
    +      (lowerBound, bin.hi)
    +    } else if (bin.lo <= upperBound && bin.hi >= upperBound) {
    +      //    lowerBound            bin.lo     upperBound       bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (bin.lo, upperBound)
    +    } else {
    +      //    lowerBound            bin.lo        bin.hi     upperBound
    +      // --------+------------------+------------+-------------+------->
    +      (bin.lo, bin.hi)
    --- End diff --
    
    add an assert to make sure if we reach here, the case is what we want.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by ron8hu <gi...@git.apache.org>.
Github user ron8hu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r153665547
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      histogram1: Histogram,
    +      histogram2: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    val t = StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +
    +    val filterCondition = new ArrayBuffer[Expression]()
    +    if (expectedMin > colStat.min.get.toString.toDouble) {
    +      filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
    +    }
    +    if (expectedMax < colStat.max.get.toString.toDouble) {
    +      filterCondition += LessThanOrEqual(col, Literal(expectedMax))
    +    }
    +    if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t)
    +  }
    +
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60)
    +    assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
    +    val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60)
    +    assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2),
    +      OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2),
    +      OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D)))
    +
    +    estimateByHistogram(
    +      histogram1 = histogram1,
    +      histogram2 = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    +      // 300*40/20 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60)
    +    assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40)
    +    val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60)
    +    assert(t2 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(30, 30, 1, 1, 300, 40/20),
    +      OverlappedRange(30, 50, math.min(30*2/3, 20), math.max(30*2/3, 20), 300*2/3, 40),
    +      OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 60D)))
    +
    +    estimateByHistogram(
    +      histogram1 = histogram1,
    +      histogram2 = histogram2,
    +      expectedMin = 30D,
    +      expectedMax = 60D,
    +      // 1 + 20 + 8
    +      expectedNdv = 29L,
    +      // 300*20/1 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value after trimming") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    --- End diff --
    
    For the very skewed cases, multiple bins in a histogram may have same distinct value.  We may add one more test case to cover this situation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r155910232
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      histogram1: Histogram,
    +      histogram2: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    val t = StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +
    +    val filterCondition = new ArrayBuffer[Expression]()
    +    if (expectedMin > colStat.min.get.toString.toDouble) {
    +      filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
    +    }
    +    if (expectedMax < colStat.max.get.toString.toDouble) {
    +      filterCondition += LessThanOrEqual(col, Literal(expectedMax))
    +    }
    +    if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t)
    +  }
    +
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60)
    +    assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
    +    val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60)
    +    assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2),
    +      OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2),
    +      OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D)))
    +
    +    estimateByHistogram(
    +      histogram1 = histogram1,
    +      histogram2 = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    +      // 300*40/20 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60)
    +    assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40)
    +    val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60)
    +    assert(t2 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(30, 30, 1, 1, 300, 40/20),
    +      OverlappedRange(30, 50, math.min(30*2/3, 20), math.max(30*2/3, 20), 300*2/3, 40),
    +      OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 60D)))
    +
    +    estimateByHistogram(
    +      histogram1 = histogram1,
    +      histogram2 = histogram2,
    +      expectedMin = 30D,
    +      expectedMax = 60D,
    +      // 1 + 20 + 8
    +      expectedNdv = 29L,
    +      // 300*20/1 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value after trimming") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    --- End diff --
    
    OK, I've added test cases for joins of skewed histograms (same skewed value and different skewed values).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85106/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157526127
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +  }
    +
    +  /** Column statistics should be consistent with histograms in tests. */
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
    +      // histogram1.bins(1) overlaps t0
    +      OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
    +      // histogram1.bins(1) overlaps t1
    +      OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D)))
    +
    +    estimateByHistogram(
    +      leftHistogram = histogram1,
    +      rightHistogram = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    +      // 300*40/20 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85061/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157514422
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -212,4 +213,186 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range
    +   * [lowerBound, upperBound].
    +   */
    +  def getOverlappedRanges(
    +    leftHistogram: Histogram,
    +    rightHistogram: Histogram,
    +    lowerBound: Double,
    +    upperBound: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [lowerBound, upperBound] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (left.hi == right.lo) {
    --- End diff --
    
    yea this branch is needed, otherwise we will get 0 ratio and lead to wrong result.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157519831
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          // Histograms are propagated as unchanged. During future estimation, they should be
    +          // truncated by the updated max/min. In this way, only pointers of the histograms are
    +          // propagated and thus reduce memory consumption.
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    shall we do this inside `computeByEquiHeightHistogram`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156389357
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [newMin, newMax] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +            val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +            if (leftRatio == 0) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = right.lo,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = left.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo <= left.lo && right.hi <= left.hi) {
    +            // Case4: the left bin is "larger" than the right bin
    +            //      right.lo           left.lo      right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
    +            val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
    +            if (leftRatio == 0) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.hi,
    +                hi = right.hi,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              OverlappedRange(
    +                lo = left.lo,
    +                hi = right.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo >= left.lo && right.hi <= left.hi) {
    +            // Case5: the left bin contains the right bin
    +            //      left.lo            right.lo     right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - right.lo) / (left.hi - left.lo)
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.hi,
    +              leftNdv = left.ndv * leftRatio,
    +              rightNdv = right.ndv,
    +              leftNumRows = leftHeight * leftRatio,
    +              rightNumRows = rightHeight
    +            )
    +          } else {
    +            assert(right.lo <= left.lo && right.hi >= left.hi)
    +            // Case6: the right bin contains the left bin
    +            //      right.lo           left.lo      left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val rightRatio = (left.hi - left.lo) / (right.hi - right.lo)
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.hi,
    +              leftNdv = left.ndv,
    +              rightNdv = right.ndv * rightRatio,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight * rightRatio
    +            )
    +          }
    +          overlappedRanges += range
    +        }
    +      }
    +    }
    +    overlappedRanges
    +  }
    +
    +  /**
    +   * Given an original bin and a value range [min, max], returns the trimmed bin and its number of
    +   * rows.
    +   */
    +  def trimBin(bin: HistogramBin, height: Double, min: Double, max: Double)
    +    : (HistogramBin, Double) = {
    +    val (lo, hi) = if (bin.lo <= min && bin.hi >= max) {
    +      //       bin.lo              min          max          bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (min, max)
    +    } else if (bin.lo <= min && bin.hi >= min) {
    +      //       bin.lo              min        bin.hi
    --- End diff --
    
    what if the `max` is after the `bin.hi`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83146 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83146/testReport)** for PR 19594 at commit [`67bd651`](https://github.com/apache/spark/commit/67bd65153bd0afcdddd30c6ef4799caa02a05a19).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157517120
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
         (ceil(card), newStats)
       }
     
    +  /** Compute join cardinality using equi-height histograms. */
    +  private def computeByEquiHeightHistogram(
    --- End diff --
    
    I think it's ok to only say `Histogram` in method names and explain it's equi-height in comments.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84989 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84989/testReport)** for PR 19594 at commit [`2a4ee99`](https://github.com/apache/spark/commit/2a4ee99526c654834f3a50ef66e674bda673f926).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84989/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r155910267
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      histogram1: Histogram,
    +      histogram2: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    val t = StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +
    +    val filterCondition = new ArrayBuffer[Expression]()
    +    if (expectedMin > colStat.min.get.toString.toDouble) {
    +      filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
    +    }
    +    if (expectedMax < colStat.max.get.toString.toDouble) {
    +      filterCondition += LessThanOrEqual(col, Literal(expectedMax))
    +    }
    +    if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t)
    +  }
    +
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60)
    +    assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
    +    val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60)
    +    assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2),
    +      OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2),
    +      OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D)))
    +
    +    estimateByHistogram(
    +      histogram1 = histogram1,
    +      histogram2 = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    +      // 300*40/20 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    --- End diff --
    
    OK, I've added test cases for joins of skewed histograms (same skewed value and different skewed values).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85001/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84676 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84676/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84676 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84676/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156388437
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [newMin, newMax] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    --- End diff --
    
    nit:
    ```
    for {
      leftBin <- leftBins
      rightBin <- rightBins
    } yield {
      ...
      OverlappedRange ...
    }
    ```
    Then we can omit `val overlappedRanges = new ArrayBuffer[OverlappedRange]()`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84682/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156846872
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    --- End diff --
    
    yea I think `upperBound/lowerBound` is better.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84679 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84679/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157503949
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -212,4 +213,186 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range
    +   * [lowerBound, upperBound].
    +   */
    +  def getOverlappedRanges(
    +    leftHistogram: Histogram,
    +    rightHistogram: Histogram,
    +    lowerBound: Double,
    +    upperBound: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [lowerBound, upperBound] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (left.hi == right.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = right.lo,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +              val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = left.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo <= left.lo && right.hi <= left.hi) {
    +            // Case4: the left bin is "larger" than the right bin
    +            //      right.lo           left.lo      right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (right.hi == left.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.hi,
    +                hi = right.hi,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
    +              val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = left.lo,
    +                hi = right.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo >= left.lo && right.hi <= left.hi) {
    +            // Case5: the left bin contains the right bin
    +            //      left.lo            right.lo     right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - right.lo) / (left.hi - left.lo)
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.hi,
    +              leftNdv = left.ndv * leftRatio,
    +              rightNdv = right.ndv,
    +              leftNumRows = leftHeight * leftRatio,
    +              rightNumRows = rightHeight
    +            )
    +          } else {
    +            assert(right.lo <= left.lo && right.hi >= left.hi)
    +            // Case6: the right bin contains the left bin
    +            //      right.lo           left.lo      left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val rightRatio = (left.hi - left.lo) / (right.hi - right.lo)
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.hi,
    +              leftNdv = left.ndv,
    +              rightNdv = right.ndv * rightRatio,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight * rightRatio
    +            )
    +          }
    +          overlappedRanges += range
    +        }
    +      }
    +    }
    +    overlappedRanges
    +  }
    +
    +  /**
    +   * Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part
    +   * of the bin in that range and its number of rows.
    +   */
    +  def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double)
    --- End diff --
    
    maybe explain in the comment that `height` means the average number of rows of the given bin inside a equi-height histogram.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83870/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156387841
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    --- End diff --
    
    how about `upperBound`/`lowerBound`? It's hard to understand the meaning of `new` by looking at this method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156847046
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [newMin, newMax] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +            val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +            if (leftRatio == 0) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = right.lo,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = left.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo <= left.lo && right.hi <= left.hi) {
    +            // Case4: the left bin is "larger" than the right bin
    +            //      right.lo           left.lo      right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
    +            val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
    +            if (leftRatio == 0) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.hi,
    +                hi = right.hi,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              OverlappedRange(
    +                lo = left.lo,
    +                hi = right.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo >= left.lo && right.hi <= left.hi) {
    +            // Case5: the left bin contains the right bin
    +            //      left.lo            right.lo     right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - right.lo) / (left.hi - left.lo)
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.hi,
    +              leftNdv = left.ndv * leftRatio,
    +              rightNdv = right.ndv,
    +              leftNumRows = leftHeight * leftRatio,
    +              rightNumRows = rightHeight
    +            )
    +          } else {
    +            assert(right.lo <= left.lo && right.hi >= left.hi)
    +            // Case6: the right bin contains the left bin
    +            //      right.lo           left.lo      left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val rightRatio = (left.hi - left.lo) / (right.hi - right.lo)
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.hi,
    +              leftNdv = left.ndv,
    +              rightNdv = right.ndv * rightRatio,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight * rightRatio
    +            )
    +          }
    +          overlappedRanges += range
    +        }
    +      }
    +    }
    +    overlappedRanges
    +  }
    +
    +  /**
    +   * Given an original bin and a value range [min, max], returns the trimmed bin and its number of
    +   * rows.
    +   */
    +  def trimBin(bin: HistogramBin, height: Double, min: Double, max: Double)
    +    : (HistogramBin, Double) = {
    +    val (lo, hi) = if (bin.lo <= min && bin.hi >= max) {
    +      //       bin.lo              min          max          bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (min, max)
    +    } else if (bin.lo <= min && bin.hi >= min) {
    +      //       bin.lo              min        bin.hi
    --- End diff --
    
    in this case, `max` is after the `bin.hi`, so the trimmed part is `(min, bin.hi)`. I'll update the figure to indicate that.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #85001 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85001/testReport)** for PR 19594 at commit [`2637429`](https://github.com/apache/spark/commit/263742914e21ba607904acb0ad35ced32aad48ab).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84676/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84991 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84991/testReport)** for PR 19594 at commit [`2637429`](https://github.com/apache/spark/commit/263742914e21ba607904acb0ad35ced32aad48ab).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84683 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84683/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157331840
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [newMin, newMax] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    --- End diff --
    
    We only collect `OverlappedRange` when [left part and right part intersect](https://github.com/apache/spark/pull/19594/files#diff-56eed9f23127c954d9add0f6c5c93820R237), and the decision is based on some computation, it's not very convenient to use it as guards. So it seems `yield` form is not very suitable for this case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84991/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84682 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84682/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157517554
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
         (ceil(card), newStats)
       }
     
    +  /** Compute join cardinality using equi-height histograms. */
    +  private def computeByEquiHeightHistogram(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): (BigInt, ColumnStat) = {
    +    val overlappedRanges = getOverlappedRanges(
    +      leftHistogram = leftHistogram,
    +      rightHistogram = rightHistogram,
    +      // Only numeric values have equi-height histograms.
    +      lowerBound = newMin.get.toString.toDouble,
    +      upperBound = newMax.get.toString.toDouble)
    --- End diff --
    
    if we assume the min/max must be defined here, I think the parameter type should be `double` instead of `Option[Any]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84682 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84682/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #85106 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85106/testReport)** for PR 19594 at commit [`16797d2`](https://github.com/apache/spark/commit/16797d2d02565616cf24e4509e43d3233c7a4714).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83870 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83870/testReport)** for PR 19594 at commit [`67bd651`](https://github.com/apache/spark/commit/67bd65153bd0afcdddd30c6ef4799caa02a05a19).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    LGTM except some minor comments


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157525695
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +  }
    +
    +  /** Column statistics should be consistent with histograms in tests. */
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
    +      // histogram1.bins(1) overlaps t0
    +      OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
    +      // histogram1.bins(1) overlaps t1
    +      OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D)))
    --- End diff --
    
    actually we can just write `10` right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84683 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84683/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by ron8hu <gi...@git.apache.org>.
Github user ron8hu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r153665092
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      histogram1: Histogram,
    +      histogram2: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    val t = StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +
    +    val filterCondition = new ArrayBuffer[Expression]()
    +    if (expectedMin > colStat.min.get.toString.toDouble) {
    +      filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
    +    }
    +    if (expectedMax < colStat.max.get.toString.toDouble) {
    +      filterCondition += LessThanOrEqual(col, Literal(expectedMax))
    +    }
    +    if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t)
    +  }
    +
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60)
    +    assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
    +    val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60)
    +    assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2),
    +      OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2),
    +      OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D)))
    +
    +    estimateByHistogram(
    +      histogram1 = histogram1,
    +      histogram2 = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    +      // 300*40/20 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    --- End diff --
    
    Histogram is supposed to handle skewed distribution effectively.  In this test case, histogram2 has a skewed distribution as one bin has only one distinct value.  Can you add a test case in which both join columns have skewed distributions?  That is both join columns have at least one bin with one distinct value each.  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84679 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84679/testReport)** for PR 19594 at commit [`e69e213`](https://github.com/apache/spark/commit/e69e21348b4cde2abaec9dbb46381caf1ed3a1a4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83871 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83871/testReport)** for PR 19594 at commit [`8b2084a`](https://github.com/apache/spark/commit/8b2084a4bec8fdd58cca809b2d2b26bdc939436d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83871 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83871/testReport)** for PR 19594 at commit [`8b2084a`](https://github.com/apache/spark/commit/8b2084a4bec8fdd58cca809b2d2b26bdc939436d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class OverlappedRange(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84683/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83870 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83870/testReport)** for PR 19594 at commit [`67bd651`](https://github.com/apache/spark/commit/67bd65153bd0afcdddd30c6ef4799caa02a05a19).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds the following public classes _(experimental)_:
      * `  case class OverlappedRange(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83146/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83825 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83825/testReport)** for PR 19594 at commit [`96776ce`](https://github.com/apache/spark/commit/96776ce73b353ca52df3a05883ba039ac1f7d617).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83871/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157696227
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
         (ceil(card), newStats)
       }
     
    +  /** Compute join cardinality using equi-height histograms. */
    +  private def computeByEquiHeightHistogram(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): (BigInt, ColumnStat) = {
    +    val overlappedRanges = getOverlappedRanges(
    +      leftHistogram = leftHistogram,
    +      rightHistogram = rightHistogram,
    +      // Only numeric values have equi-height histograms.
    +      lowerBound = newMin.get.toString.toDouble,
    +      upperBound = newMax.get.toString.toDouble)
    --- End diff --
    
    that's because we need to update the column stats' min and max at the end of the method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    ping @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157698793
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
         (ceil(card), newStats)
       }
     
    +  /** Compute join cardinality using equi-height histograms. */
    +  private def computeByEquiHeightHistogram(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): (BigInt, ColumnStat) = {
    +    val overlappedRanges = getOverlappedRanges(
    +      leftHistogram = leftHistogram,
    +      rightHistogram = rightHistogram,
    +      // Only numeric values have equi-height histograms.
    +      lowerBound = newMin.get.toString.toDouble,
    +      upperBound = newMax.get.toString.toDouble)
    +
    +    var card: BigDecimal = 0
    +    var totalNdv: Double = 0
    +    for (i <- overlappedRanges.indices) {
    +      val range = overlappedRanges(i)
    +      if (i == 0 || range.hi != overlappedRanges(i - 1).hi) {
    +        // If range.hi == overlappedRanges(i - 1).hi, that means the current range has only one
    +        // value, and this value is already counted in the previous range. So there is no need to
    +        // count it in this range.
    +        totalNdv += math.min(range.leftNdv, range.rightNdv)
    +      }
    +      // Apply the formula in this overlapped range.
    +      card += range.leftNumRows * range.rightNumRows / math.max(range.leftNdv, range.rightNdv)
    +    }
    +
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    --- End diff --
    
    how do we use  left/right numRows to calculate this? Ideally avgLen is calculated by total length of keys / numRowsAfterJoin. For string type, we don't the exact length of the matched keys (we don't support string histogram yet), for numeric types, their avgLen should be the same. So the equation is a fair approximation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156861642
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    Actually keeping it unchanged is more memory efficient. We just pass around pointers, but updating the histogram means creating a new one.
    
    Let's keep it, and add some comments to explain it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157524477
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +  }
    +
    +  /** Column statistics should be consistent with histograms in tests. */
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
    +      // histogram1.bins(1) overlaps t0
    +      OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
    +      // histogram1.bins(1) overlaps t1
    +      OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D)))
    --- End diff --
    
    10D looks weird, how about 10.0


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #85061 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85061/testReport)** for PR 19594 at commit [`2637429`](https://github.com/apache/spark/commit/263742914e21ba607904acb0ad35ced32aad48ab).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84991 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84991/testReport)** for PR 19594 at commit [`2637429`](https://github.com/apache/spark/commit/263742914e21ba607904acb0ad35ced32aad48ab).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157519924
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,19 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          // Histograms are propagated as unchanged. During future estimation, they should be
    +          // truncated by the updated max/min. In this way, only pointers of the histograms are
    +          // propagated and thus reduce memory consumption.
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    i.e. https://github.com/apache/spark/pull/19594/files#diff-6387e7aaeb7d8e0cb1457b9d0fe5cd00R272


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157513325
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -212,4 +213,186 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range
    +   * [lowerBound, upperBound].
    +   */
    +  def getOverlappedRanges(
    +    leftHistogram: Histogram,
    +    rightHistogram: Histogram,
    +    lowerBound: Double,
    +    upperBound: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [lowerBound, upperBound] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (left.hi == right.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = right.lo,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +              val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = right.lo,
    +                hi = left.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo <= left.lo && right.hi <= left.hi) {
    +            // Case4: the left bin is "larger" than the right bin
    +            //      right.lo           left.lo      right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            if (right.hi == left.lo) {
    +              // The overlapped range has only one value.
    +              OverlappedRange(
    +                lo = right.hi,
    +                hi = right.hi,
    +                leftNdv = 1,
    +                rightNdv = 1,
    +                leftNumRows = leftHeight / left.ndv,
    +                rightNumRows = rightHeight / right.ndv
    +              )
    +            } else {
    +              val leftRatio = (right.hi - left.lo) / (left.hi - left.lo)
    +              val rightRatio = (right.hi - left.lo) / (right.hi - right.lo)
    +              OverlappedRange(
    +                lo = left.lo,
    +                hi = right.hi,
    +                leftNdv = left.ndv * leftRatio,
    +                rightNdv = right.ndv * rightRatio,
    +                leftNumRows = leftHeight * leftRatio,
    +                rightNumRows = rightHeight * rightRatio
    +              )
    +            }
    +          } else if (right.lo >= left.lo && right.hi <= left.hi) {
    +            // Case5: the left bin contains the right bin
    +            //      left.lo            right.lo     right.hi         left.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (right.hi - right.lo) / (left.hi - left.lo)
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.hi,
    +              leftNdv = left.ndv * leftRatio,
    +              rightNdv = right.ndv,
    +              leftNumRows = leftHeight * leftRatio,
    +              rightNumRows = rightHeight
    +            )
    +          } else {
    +            assert(right.lo <= left.lo && right.hi >= left.hi)
    +            // Case6: the right bin contains the left bin
    +            //      right.lo           left.lo      left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val rightRatio = (left.hi - left.lo) / (right.hi - right.lo)
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.hi,
    +              leftNdv = left.ndv,
    +              rightNdv = right.ndv * rightRatio,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight * rightRatio
    +            )
    +          }
    +          overlappedRanges += range
    +        }
    +      }
    +    }
    +    overlappedRanges
    +  }
    +
    +  /**
    +   * Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part
    +   * of the bin in that range and its number of rows.
    +   */
    +  def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double)
    +  : (HistogramBin, Double) = {
    +    val (lo, hi) = if (bin.lo <= lowerBound && bin.hi >= upperBound) {
    +      //       bin.lo          lowerBound     upperBound      bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (lowerBound, upperBound)
    +    } else if (bin.lo <= lowerBound && bin.hi >= lowerBound) {
    +      //       bin.lo          lowerBound      bin.hi      upperBound
    +      // --------+------------------+------------+-------------+------->
    +      (lowerBound, bin.hi)
    +    } else if (bin.lo <= upperBound && bin.hi >= upperBound) {
    +      //    lowerBound            bin.lo     upperBound       bin.hi
    +      // --------+------------------+------------+-------------+------->
    +      (bin.lo, upperBound)
    +    } else {
    +      //    lowerBound            bin.lo        bin.hi     upperBound
    +      // --------+------------------+------------+-------------+------->
    +      (bin.lo, bin.hi)
    +    }
    +
    +    if (bin.hi == bin.lo) {
    --- End diff --
    
    do we really need this branch? I think the `else if` branch can also cover it, if we assume `bin.ndv` must be 1 if `bin.hi == bin.lo`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156392538
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [newMin, newMax] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= newMax && b.hi >= newMin)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    +            OverlappedRange(
    +              lo = right.lo,
    +              hi = right.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight / left.ndv,
    +              rightNumRows = rightHeight
    +            )
    +          } else if (right.lo >= left.lo && right.hi >= left.hi) {
    +            // Case3: the left bin is "smaller" than the right bin
    +            //      left.lo            right.lo     left.hi          right.hi
    +            // --------+------------------+------------+----------------+------->
    +            val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
    +            val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
    +            if (leftRatio == 0) {
    --- End diff --
    
    it's more understandable to write `if (right.lo == left.hi)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156847785
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    Currently we don't update histogram since min/max can help us to know which bins are valid. It doesn't affect correctness. But updating histograms helps to reduce memory usage for histogram propagation. We can do this in both filter and join estimation in following PRs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157523857
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +  }
    +
    +  /** Column statistics should be consistent with histograms in tests. */
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
    --- End diff --
    
    space between oeprators.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    cc @cloud-fan @gatorsmile @ron8hu 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #85001 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85001/testReport)** for PR 19594 at commit [`2637429`](https://github.com/apache/spark/commit/263742914e21ba607904acb0ad35ced32aad48ab).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157331711
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    ah right, we can keep it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156393413
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging {
           val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType)
           if (ValueInterval.isIntersected(lInterval, rInterval)) {
             val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
    -        val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax)
    -        keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat)
    +        val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match {
    +          case (Some(l: Histogram), Some(r: Histogram)) =>
    +            computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax)
    +          case _ =>
    +            computeByNdv(leftKey, rightKey, newMin, newMax)
    +        }
    +        keyStatsAfterJoin += (
    +          leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
    +          rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
    --- End diff --
    
    should we update the histogram after join?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #85106 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85106/testReport)** for PR 19594 at commit [`16797d2`](https://github.com/apache/spark/commit/16797d2d02565616cf24e4509e43d3233c7a4714).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157513559
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -212,4 +213,186 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range
    +   * [lowerBound, upperBound].
    +   */
    +  def getOverlappedRanges(
    +    leftHistogram: Histogram,
    +    rightHistogram: Histogram,
    +    lowerBound: Double,
    +    upperBound: Double): Seq[OverlappedRange] = {
    +    val overlappedRanges = new ArrayBuffer[OverlappedRange]()
    +    // Only bins whose range intersect [lowerBound, upperBound] have join possibility.
    +    val leftBins = leftHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +    val rightBins = rightHistogram.bins
    +      .filter(b => b.lo <= upperBound && b.hi >= lowerBound)
    +
    +    leftBins.foreach { lb =>
    +      rightBins.foreach { rb =>
    +        val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound)
    +        val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound)
    +        // Only collect overlapped ranges.
    +        if (left.lo <= right.hi && left.hi >= right.lo) {
    +          // Collect overlapped ranges.
    +          val range = if (left.lo == left.hi) {
    +            // Case1: the left bin has only one value
    +            OverlappedRange(
    +              lo = left.lo,
    +              hi = left.lo,
    +              leftNdv = 1,
    +              rightNdv = 1,
    +              leftNumRows = leftHeight,
    +              rightNumRows = rightHeight / right.ndv
    +            )
    +          } else if (right.lo == right.hi) {
    +            // Case2: the right bin has only one value
    --- End diff --
    
    do we really need case 1 and 2? aren't they covered by branches below?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by wzhfy <gi...@git.apache.org>.
Github user wzhfy commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    retest this please..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83825 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83825/testReport)** for PR 19594 at commit [`96776ce`](https://github.com/apache/spark/commit/96776ce73b353ca52df3a05883ba039ac1f7d617).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class OverlappedRange(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157525895
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +  }
    +
    +  /** Column statistics should be consistent with histograms in tests. */
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
    +      // histogram1.bins(1) overlaps t0
    +      OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
    +      // histogram1.bins(1) overlaps t1
    +      OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D)))
    +
    +    estimateByHistogram(
    +      leftHistogram = histogram1,
    +      rightHistogram = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    --- End diff --
    
    `expectedNdv = 10 + 20 + 8`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157519368
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala ---
    @@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
         (ceil(card), newStats)
       }
     
    +  /** Compute join cardinality using equi-height histograms. */
    +  private def computeByEquiHeightHistogram(
    +      leftKey: AttributeReference,
    +      rightKey: AttributeReference,
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Option[Any],
    +      newMax: Option[Any]): (BigInt, ColumnStat) = {
    +    val overlappedRanges = getOverlappedRanges(
    +      leftHistogram = leftHistogram,
    +      rightHistogram = rightHistogram,
    +      // Only numeric values have equi-height histograms.
    +      lowerBound = newMin.get.toString.toDouble,
    +      upperBound = newMax.get.toString.toDouble)
    +
    +    var card: BigDecimal = 0
    +    var totalNdv: Double = 0
    +    for (i <- overlappedRanges.indices) {
    +      val range = overlappedRanges(i)
    +      if (i == 0 || range.hi != overlappedRanges(i - 1).hi) {
    +        // If range.hi == overlappedRanges(i - 1).hi, that means the current range has only one
    +        // value, and this value is already counted in the previous range. So there is no need to
    +        // count it in this range.
    +        totalNdv += math.min(range.leftNdv, range.rightNdv)
    +      }
    +      // Apply the formula in this overlapped range.
    +      card += range.leftNumRows * range.rightNumRows / math.max(range.leftNdv, range.rightNdv)
    +    }
    +
    +    val leftKeyStat = leftStats.attributeStats(leftKey)
    +    val rightKeyStat = rightStats.attributeStats(rightKey)
    +    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
    +    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
    --- End diff --
    
    shall we count left/right numRows when calculating this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #84989 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84989/testReport)** for PR 19594 at commit [`2a4ee99`](https://github.com/apache/spark/commit/2a4ee99526c654834f3a50ef66e674bda673f926).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83825/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r156388807
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala ---
    @@ -114,4 +115,183 @@ object EstimationUtils {
         }
       }
     
    +  /**
    +   * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax].
    +   */
    +  def getOverlappedRanges(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      newMin: Double,
    +      newMax: Double): Seq[OverlappedRange] = {
    --- End diff --
    
    `max`/`min` is also fine


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [WIP] [SPARK-21984] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #83146 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83146/testReport)** for PR 19594 at commit [`67bd651`](https://github.com/apache/spark/commit/67bd65153bd0afcdddd30c6ef4799caa02a05a19).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class OverlappedRange(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19594


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    **[Test build #85061 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85061/testReport)** for PR 19594 at commit [`2637429`](https://github.com/apache/spark/commit/263742914e21ba607904acb0ad35ced32aad48ab).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19594#discussion_r157528163
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
    @@ -67,6 +67,222 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         rowCount = 2,
         attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
     
    +  private def estimateByHistogram(
    +      leftHistogram: Histogram,
    +      rightHistogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double,
    +      expectedNdv: Long,
    +      expectedRows: Long): Unit = {
    +    val col1 = attr("key1")
    +    val col2 = attr("key2")
    +    val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax)
    +    val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax)
    +
    +    val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
    +    val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
    +    val expectedStatsAfterJoin = Statistics(
    +      sizeInBytes = expectedRows * (8 + 2 * 4),
    +      rowCount = Some(expectedRows),
    +      attributeStats = AttributeMap(Seq(
    +        col1 -> c1.stats.attributeStats(col1).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
    +        col2 -> c2.stats.attributeStats(col2).copy(
    +          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
    +    )
    +
    +    // Join order should not affect estimation result.
    +    Seq(c1JoinC2, c2JoinC1).foreach { join =>
    +      assert(join.stats == expectedStatsAfterJoin)
    +    }
    +  }
    +
    +  private def generateJoinChild(
    +      col: Attribute,
    +      histogram: Histogram,
    +      expectedMin: Double,
    +      expectedMax: Double): LogicalPlan = {
    +    val colStat = inferColumnStat(histogram)
    +    StatsTestPlan(
    +      outputList = Seq(col),
    +      rowCount = (histogram.height * histogram.bins.length).toLong,
    +      attributeStats = AttributeMap(Seq(col -> colStat)))
    +  }
    +
    +  /** Column statistics should be consistent with histograms in tests. */
    +  private def inferColumnStat(histogram: Histogram): ColumnStat = {
    +    var ndv = 0L
    +    for (i <- histogram.bins.indices) {
    +      val bin = histogram.bins(i)
    +      if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
    +        ndv += bin.ndv
    +      }
    +    }
    +    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
    +      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
    +      histogram = Some(histogram))
    +  }
    +
    +  test("equi-height histograms: a bin is contained by another one") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60)
    +    assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
    +      // histogram1.bins(1) overlaps t0
    +      OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2),
    +      // histogram1.bins(1) overlaps t1
    +      OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D)))
    +
    +    estimateByHistogram(
    +      leftHistogram = histogram1,
    +      rightHistogram = histogram2,
    +      expectedMin = 10D,
    +      expectedMax = 60D,
    +      // 10 + 20 + 8
    +      expectedNdv = 38L,
    +      // 300*40/20 + 200*40/20 + 100*20/10
    +      expectedRows = 1200L)
    +  }
    +
    +  test("equi-height histograms: a bin has only one value after trimming") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 50, upperBound = 75)
    +    assert(t0 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h0 == 2)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 50, upperBound = 75)
    +    assert(t1 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h1 == 50)
    +
    +    val expectedRanges = Seq(
    +      // histogram1.bins(0) overlaps t0
    +      OverlappedRange(50, 50, 1, 1, 300/10, 2),
    +      // histogram1.bins(0) overlaps t1
    +      OverlappedRange(50, 60, 10, 20*10/25, 300, 50*10/25),
    +      // histogram1.bins(1) overlaps t1
    +      OverlappedRange(60, 75, 3, 20*15/25, 300, 50*15/25)
    +    )
    +    assert(expectedRanges.equals(
    +      getOverlappedRanges(histogram1, histogram2, lowerBound = 50D, upperBound = 75D)))
    +
    +    estimateByHistogram(
    +      leftHistogram = histogram1,
    +      rightHistogram = histogram2,
    +      expectedMin = 50D,
    +      expectedMax = 75D,
    +      // 1 + 8 + 3
    +      expectedNdv = 12L,
    +      // 30*2/1 + 300*20/10 + 300*30/12
    +      expectedRows = 1410L)
    +  }
    +
    +  test("equi-height histograms: skew distribution (some bins have only one value)") {
    +    val histogram1 = Histogram(height = 300, Array(
    +      HistogramBin(lo = 30, hi = 30, ndv = 1),
    +      HistogramBin(lo = 30, hi = 30, ndv = 1),
    +      HistogramBin(lo = 30, hi = 60, ndv = 30)))
    +    val histogram2 = Histogram(height = 100, Array(
    +      HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
    +    // test bin trimming
    +    val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 30, upperBound = 60)
    +    assert(t0 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h0 == 40)
    +    val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 30, upperBound = 60)
    +    assert(t1 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20)
    +
    +    val expectedRanges = Seq(
    +      OverlappedRange(30, 30, 1, 1, 300, 40/20),
    +      OverlappedRange(30, 30, 1, 1, 300, 40/20),
    +      OverlappedRange(30, 50, 30*2/3, 20, 300*2/3, 40),
    +      OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
    --- End diff --
    
    hmm, shouldn't we have 6 overlapped ranges here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19594: [SPARK-21984] [SQL] Join estimation based on equi-height...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19594
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84679/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org