You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/09/11 07:03:48 UTC

spark git commit: [SPARK-17439][SQL] Fixing compression issues with approximate quantiles and adding more tests

Repository: spark
Updated Branches:
  refs/heads/master 29ba9578f -> 180796ecb


[SPARK-17439][SQL] Fixing compression issues with approximate quantiles and adding more tests

## What changes were proposed in this pull request?

This PR build on #14976 and fixes a correctness bug that would cause the wrong quantile to be returned for small target errors.

## How was this patch tested?

This PR adds 8 unit tests that were failing without the fix.

Author: Timothy Hunter <ti...@databricks.com>
Author: Sean Owen <so...@cloudera.com>

Closes #15002 from thunterdb/ml-1783.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/180796ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/180796ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/180796ec

Branch: refs/heads/master
Commit: 180796ecb3a00facde2d98affdb5aa38dd258875
Parents: 29ba957
Author: Timothy Hunter <ti...@databricks.com>
Authored: Sun Sep 11 08:03:45 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Sep 11 08:03:45 2016 +0100

----------------------------------------------------------------------
 .../sql/catalyst/util/QuantileSummaries.scala   | 16 ++++++++---
 .../catalyst/util/QuantileSummariesSuite.scala  | 29 ++++++++++++++++++--
 2 files changed, 39 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/180796ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 7512ace..fd62bd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
 
@@ -61,7 +61,12 @@ class QuantileSummaries(
   def insert(x: Double): QuantileSummaries = {
     headSampled += x
     if (headSampled.size >= defaultHeadSize) {
-      this.withHeadBufferInserted
+      val result = this.withHeadBufferInserted
+      if (result.sampled.length >= compressThreshold) {
+        result.compress()
+      } else {
+        result
+      }
     } else {
       this
     }
@@ -236,7 +241,7 @@ object QuantileSummaries {
     if (currentSamples.isEmpty) {
       return Array.empty[Stats]
     }
-    val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+    val res = ListBuffer.empty[Stats]
     // Start for the last element, which is always part of the set.
     // The head contains the current new head, that may be merged with the current element.
     var head = currentSamples.last
@@ -258,7 +263,10 @@ object QuantileSummaries {
     }
     res.prepend(head)
     // If necessary, add the minimum element:
-    res.prepend(currentSamples.head)
+    val currHead = currentSamples.head
+    if (currHead.value < head.value) {
+      res.prepend(currentSamples.head)
+    }
     res.toArray
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/180796ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
index 89b2a22..5e90970 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -40,6 +40,20 @@ class QuantileSummariesSuite extends SparkFunSuite {
     summary.compress()
   }
 
+  /**
+   * Interleaves compression and insertions.
+   */
+  private def buildCompressSummary(
+      data: Seq[Double],
+      epsi: Double,
+      threshold: Int): QuantileSummaries = {
+    var summary = new QuantileSummaries(threshold, epsi)
+    data.foreach { x =>
+      summary = summary.insert(x).compress()
+    }
+    summary
+  }
+
   private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
     val approx = summary.query(quant)
     // The rank of the approximation.
@@ -54,8 +68,8 @@ class QuantileSummariesSuite extends SparkFunSuite {
 
   for {
     (seq_name, data) <- Seq(increasing, decreasing, random)
-    epsi <- Seq(0.1, 0.0001)
-    compression <- Seq(1000, 10)
+    epsi <- Seq(0.1, 0.0001) // With a significant value and with full precision
+    compression <- Seq(1000, 10) // This interleaves n so that we test without and with compression
   } {
 
     test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
@@ -75,6 +89,17 @@ class QuantileSummariesSuite extends SparkFunSuite {
       checkQuantile(0.1, data, s)
       checkQuantile(0.001, data, s)
     }
+
+    test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression " +
+      s"(interleaved)") {
+      val s = buildCompressSummary(data, epsi, compression)
+      assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
   }
 
   // Tests for merging procedure


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org