You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by wzhfy <gi...@git.apache.org> on 2017/12/09 02:08:18 UTC
[GitHub] spark pull request #19594: [SPARK-21984] [SQL] Join estimation based on equi...
Github user wzhfy commented on a diff in the pull request:
https://github.com/apache/spark/pull/19594#discussion_r155910232
--- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala ---
@@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
rowCount = 2,
attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo)))
+ private def estimateByHistogram(
+ histogram1: Histogram,
+ histogram2: Histogram,
+ expectedMin: Double,
+ expectedMax: Double,
+ expectedNdv: Long,
+ expectedRows: Long): Unit = {
+ val col1 = attr("key1")
+ val col2 = attr("key2")
+ val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax)
+ val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax)
+
+ val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2)))
+ val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1)))
+ val expectedStatsAfterJoin = Statistics(
+ sizeInBytes = expectedRows * (8 + 2 * 4),
+ rowCount = Some(expectedRows),
+ attributeStats = AttributeMap(Seq(
+ col1 -> c1.stats.attributeStats(col1).copy(
+ distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
+ col2 -> c2.stats.attributeStats(col2).copy(
+ distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
+ )
+
+ // Join order should not affect estimation result.
+ Seq(c1JoinC2, c2JoinC1).foreach { join =>
+ assert(join.stats == expectedStatsAfterJoin)
+ }
+ }
+
+ private def generateJoinChild(
+ col: Attribute,
+ histogram: Histogram,
+ expectedMin: Double,
+ expectedMax: Double): LogicalPlan = {
+ val colStat = inferColumnStat(histogram)
+ val t = StatsTestPlan(
+ outputList = Seq(col),
+ rowCount = (histogram.height * histogram.bins.length).toLong,
+ attributeStats = AttributeMap(Seq(col -> colStat)))
+
+ val filterCondition = new ArrayBuffer[Expression]()
+ if (expectedMin > colStat.min.get.toString.toDouble) {
+ filterCondition += GreaterThanOrEqual(col, Literal(expectedMin))
+ }
+ if (expectedMax < colStat.max.get.toString.toDouble) {
+ filterCondition += LessThanOrEqual(col, Literal(expectedMax))
+ }
+ if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t)
+ }
+
+ private def inferColumnStat(histogram: Histogram): ColumnStat = {
+ var ndv = 0L
+ for (i <- histogram.bins.indices) {
+ val bin = histogram.bins(i)
+ if (i == 0 || bin.hi != histogram.bins(i - 1).hi) {
+ ndv += bin.ndv
+ }
+ }
+ ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
+ max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
+ histogram = Some(histogram))
+ }
+
+ test("equi-height histograms: a bin is contained by another one") {
+ val histogram1 = Histogram(height = 300, Array(
+ HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30)))
+ val histogram2 = Histogram(height = 100, Array(
+ HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
+ // test bin trimming
+ val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60)
+ assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80)
+ val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60)
+ assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+ val expectedRanges = Seq(
+ OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2),
+ OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2),
+ OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
+ )
+ assert(expectedRanges.equals(
+ getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D)))
+
+ estimateByHistogram(
+ histogram1 = histogram1,
+ histogram2 = histogram2,
+ expectedMin = 10D,
+ expectedMax = 60D,
+ // 10 + 20 + 8
+ expectedNdv = 38L,
+ // 300*40/20 + 200*40/20 + 100*20/10
+ expectedRows = 1200L)
+ }
+
+ test("equi-height histograms: a bin has only one value") {
+ val histogram1 = Histogram(height = 300, Array(
+ HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30)))
+ val histogram2 = Histogram(height = 100, Array(
+ HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
+ // test bin trimming
+ val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60)
+ assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40)
+ val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60)
+ assert(t2 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20)
+
+ val expectedRanges = Seq(
+ OverlappedRange(30, 30, 1, 1, 300, 40/20),
+ OverlappedRange(30, 50, math.min(30*2/3, 20), math.max(30*2/3, 20), 300*2/3, 40),
+ OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20)
+ )
+ assert(expectedRanges.equals(
+ getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 60D)))
+
+ estimateByHistogram(
+ histogram1 = histogram1,
+ histogram2 = histogram2,
+ expectedMin = 30D,
+ expectedMax = 60D,
+ // 1 + 20 + 8
+ expectedNdv = 29L,
+ // 300*20/1 + 200*40/20 + 100*20/10
+ expectedRows = 1200L)
+ }
+
+ test("equi-height histograms: a bin has only one value after trimming") {
+ val histogram1 = Histogram(height = 300, Array(
+ HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3)))
+ val histogram2 = Histogram(height = 100, Array(
+ HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
--- End diff --
OK, I've added test cases for joins of skewed histograms (same skewed value and different skewed values).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org