You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2016/02/23 08:31:04 UTC

spark git commit: [SPARK-6761][SQL] Approximate quantile for DataFrame

Repository: spark
Updated Branches:
  refs/heads/master 01e10c9fe -> 4fd199369


[SPARK-6761][SQL] Approximate quantile for DataFrame

JIRA: https://issues.apache.org/jira/browse/SPARK-6761

Compute approximate quantile based on the paper Greenwald, Michael and Khanna, Sanjeev, "Space-efficient Online Computation of Quantile Summaries," SIGMOD '01.

Author: Timothy Hunter <ti...@databricks.com>
Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #6042 from viirya/approximate_quantile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fd19936
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fd19936
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fd19936

Branch: refs/heads/master
Commit: 4fd1993692d45a0da0289b8c7669cc1dc3fe0f2b
Parents: 01e10c9
Author: Timothy Hunter <ti...@databricks.com>
Authored: Mon Feb 22 23:31:00 2016 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Feb 22 23:31:00 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/DataFrameStatFunctions.scala      |  10 +
 .../sql/execution/stat/StatFunctions.scala      | 309 +++++++++++++++++++
 .../apache/spark/sql/DataFrameStatSuite.scala   |  60 ++++
 .../execution/stat/ApproxQuantileSuite.scala    | 129 ++++++++
 4 files changed, 508 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index bb3cc02..7f110c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -37,6 +37,16 @@ import org.apache.spark.util.sketch.{BloomFilter, CountMinSketch}
 final class DataFrameStatFunctions private[sql](df: DataFrame) {
 
   /**
+   * Calculate the approximate quantile of numerical column of a DataFrame.
+   * @param col the name of the column
+   * @param quantile the quantile number
+   * @return the approximate quantile
+   */
+  def approxQuantile(col: String, quantile: Double, epsilon: Double): Double = {
+    StatFunctions.approxQuantile(df, col, quantile, epsilon)
+  }
+
+  /**
    * Calculate the sample covariance of two numerical columns of a DataFrame.
    * @param col1 the name of the first column
    * @param col2 the name of the second column

http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 7d70194..eb056d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution.stat
 
+import scala.annotation.tailrec
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.Logging
 import org.apache.spark.sql.{Column, DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
@@ -27,6 +30,312 @@ import org.apache.spark.unsafe.types.UTF8String
 
 private[sql] object StatFunctions extends Logging {
 
+  import QuantileSummaries.Stats
+
+  /**
+   * Calculates the approximate quantile for the given column.
+   *
+   * If you need to compute multiple quantiles at once, you should use [[multipleApproxQuantiles]]
+   *
+   * Note on the target error.
+   *
+   * The result of this algorithm has the following deterministic bound:
+   * if the DataFrame has N elements and if we request the quantile `phi` up to error `epsi`,
+   * then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
+   * of `x` close to (phi * N). More precisely:
+   *
+   *   floor((phi - epsi) * N) <= rank(x) <= ceil((phi + epsi) * N)
+   *
+   * Note on the algorithm used.
+   *
+   * This method implements a variation of the Greenwald-Khanna algorithm
+   * (with some speed optimizations). The algorithm was first present in the following article:
+   * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
+   * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
+   *
+   * The performance optimizations are detailed in the comments of the implementation.
+   *
+   * @param df the dataframe to estimate quantiles on
+   * @param col the name of the column
+   * @param quantile the target quantile of interest
+   * @param epsilon the target error. Should be >= 0.
+   * */
+  def approxQuantile(
+      df: DataFrame,
+      col: String,
+      quantile: Double,
+      epsilon: Double = QuantileSummaries.defaultEpsilon): Double = {
+    require(quantile >= 0.0 && quantile <= 1.0, "Quantile must be in the range of (0.0, 1.0).")
+    val Seq(Seq(res)) = multipleApproxQuantiles(df, Seq(col), Seq(quantile), epsilon)
+    res
+  }
+
+  /**
+   * Runs multiple quantile computations in a single pass, with the same target error.
+   *
+   * See [[approxQuantile)]] for more details on the approximation guarantees.
+   *
+   * @param df the dataframe
+   * @param cols columns of the dataframe
+   * @param quantiles target quantiles to compute
+   * @param epsilon the precision to achieve
+   * @return for each column, returns the requested approximations
+   */
+  def multipleApproxQuantiles(
+      df: DataFrame,
+      cols: Seq[String],
+      quantiles: Seq[Double],
+      epsilon: Double): Seq[Seq[Double]] = {
+    val columns: Seq[Column] = cols.map { colName =>
+      val field = df.schema(colName)
+      require(field.dataType.isInstanceOf[NumericType],
+        s"Quantile calculation for column $colName with data type ${field.dataType}" +
+        " is not supported.")
+      Column(Cast(Column(colName).expr, DoubleType))
+    }
+    val emptySummaries = Array.fill(cols.size)(
+      new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, epsilon))
+
+    // Note that it works more or less by accident as `rdd.aggregate` is not a pure function:
+    // this function returns the same array as given in the input (because `aggregate` reuses
+    // the same argument).
+    def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = {
+      var i = 0
+      while (i < summaries.length) {
+        summaries(i) = summaries(i).insert(row.getDouble(i))
+        i += 1
+      }
+      summaries
+    }
+
+    def merge(
+        sum1: Array[QuantileSummaries],
+        sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = {
+      sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) }
+    }
+    val summaries = df.select(columns: _*).rdd.aggregate(emptySummaries)(apply, merge)
+
+    summaries.map { summary => quantiles.map(summary.query) }
+  }
+
+  /**
+   * Helper class to compute approximate quantile summary.
+   * This implementation is based on the algorithm proposed in the paper:
+   * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
+   * and Khanna, Sanjeev. (http://dl.acm.org/citation.cfm?id=375670)
+   *
+   * In order to optimize for speed, it maintains an internal buffer of the last seen samples,
+   * and only inserts them after crossing a certain size threshold. This guarantees a near-constant
+   * runtime complexity compared to the original algorithm.
+   *
+   * @param compressThreshold the compression threshold: after the internal buffer of statistics
+   *                          crosses this size, it attempts to compress the statistics together
+   * @param epsilon the target precision
+   * @param sampled a buffer of quantile statistics. See the G-K article for more details
+   * @param count the count of all the elements *inserted in the sampled buffer*
+   *              (excluding the head buffer)
+   * @param headSampled a buffer of latest samples seen so far
+   */
+  class QuantileSummaries(
+      val compressThreshold: Int,
+      val epsilon: Double,
+      val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
+      private[stat] var count: Long = 0L,
+      val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable {
+
+    import QuantileSummaries._
+
+    def insert(x: Double): QuantileSummaries = {
+      headSampled.append(x)
+      if (headSampled.size >= defaultHeadSize) {
+        this.withHeadInserted
+      } else {
+        this
+      }
+    }
+
+    /**
+     * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse
+     * the summary statistics in a single batch.
+     *
+     * This method does not modify the current object and returns if necessary a new copy.
+     *
+     * @return a new quantile summary object.
+     */
+    private def withHeadInserted: QuantileSummaries = {
+      if (headSampled.isEmpty) {
+        return this
+      }
+      var currentCount = count
+      val sorted = headSampled.toArray.sorted
+      val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
+      // The index of the next element to insert
+      var sampleIdx = 0
+      // The index of the sample currently being inserted.
+      var opsIdx: Int = 0
+      while(opsIdx < sorted.length) {
+        val currentSample = sorted(opsIdx)
+        // Add all the samples before the next observation.
+        while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
+          newSamples.append(sampled(sampleIdx))
+          sampleIdx += 1
+        }
+
+        // If it is the first one to insert, of if it is the last one
+        currentCount += 1
+        val delta =
+          if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
+            0
+          } else {
+            math.floor(2 * epsilon * currentCount).toInt
+          }
+
+        val tuple = Stats(currentSample, 1, delta)
+        newSamples.append(tuple)
+        opsIdx += 1
+      }
+
+      // Add all the remaining existing samples
+      while(sampleIdx < sampled.size) {
+        newSamples.append(sampled(sampleIdx))
+        sampleIdx += 1
+      }
+      new QuantileSummaries(compressThreshold, epsilon, newSamples, currentCount)
+    }
+
+    def compress(): QuantileSummaries = {
+      // Inserts all the elements first
+      val inserted = this.withHeadInserted
+      assert(inserted.headSampled.isEmpty)
+      assert(inserted.count == count + headSampled.size)
+      val compressed =
+        compressImmut(inserted.sampled, mergeThreshold = 2 * epsilon * inserted.count)
+      new QuantileSummaries(compressThreshold, epsilon, compressed, inserted.count)
+    }
+
+    def merge(other: QuantileSummaries): QuantileSummaries = {
+      if (other.count == 0) {
+        this
+      } else if (count == 0) {
+        other
+      } else {
+        // We rely on the fact that they are ordered to efficiently interleave them.
+        val thisSampled = sampled.toList
+        val otherSampled = other.sampled.toList
+        val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+
+        @tailrec
+        def mergeCurrent(
+            thisList: List[Stats],
+            otherList: List[Stats]): Unit = (thisList, otherList) match {
+          case (Nil, l) =>
+            res.appendAll(l)
+          case (l, Nil) =>
+            res.appendAll(l)
+          case (h1 :: t1, h2 :: t2) if h1.value > h2.value =>
+            mergeCurrent(otherList, thisList)
+          case (h1 :: t1, l) =>
+            // We know that h1.value <= all values in l
+            // TODO(thunterdb) do we need to adjust g and delta?
+            res.append(h1)
+            mergeCurrent(t1, l)
+        }
+
+        mergeCurrent(thisSampled, otherSampled)
+        val comp = compressImmut(res, mergeThreshold = 2 * epsilon * count)
+        new QuantileSummaries(other.compressThreshold, other.epsilon, comp, other.count + count)
+      }
+    }
+
+    def query(quantile: Double): Double = {
+      require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
+
+      if (quantile <= epsilon) {
+        return sampled.head.value
+      }
+
+      if (quantile >= 1 - epsilon) {
+        return sampled.last.value
+      }
+
+      // Target rank
+      val rank = math.ceil(quantile * count).toInt
+      val targetError = math.ceil(epsilon * count)
+      // Minimum rank at current sample
+      var minRank = 0
+      var i = 1
+      while (i < sampled.size - 1) {
+        val curSample = sampled(i)
+        minRank += curSample.g
+        val maxRank = minRank + curSample.delta
+        if (maxRank - targetError <= rank && rank <= minRank + targetError) {
+          return curSample.value
+        }
+        i += 1
+      }
+      sampled.last.value
+    }
+  }
+
+  object QuantileSummaries {
+    // TODO(tjhunter) more tuning could be done one the constants here, but for now
+    // the main cost of the algorithm is accessing the data in SQL.
+    /**
+     * The default value for the compression threshold.
+     */
+    val defaultCompressThreshold: Int = 10000
+
+    /**
+     * The size of the head buffer.
+     */
+    val defaultHeadSize: Int = 50000
+
+    /**
+     * The default value for epsilon.
+     */
+    val defaultEpsilon: Double = 0.01
+
+    /**
+     * Statisttics from the Greenwald-Khanna paper.
+     * @param value the sampled value
+     * @param g the minimum rank jump from the previous value's minimum rank
+     * @param delta the maximum span of the rank.
+     */
+    case class Stats(value: Double, g: Int, delta: Int)
+
+    private def compressImmut(
+        currentSamples: IndexedSeq[Stats],
+        mergeThreshold: Double): ArrayBuffer[Stats] = {
+      val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+      if (currentSamples.isEmpty) {
+        return res
+      }
+      // Start for the last element, which is always part of the set.
+      // The head contains the current new head, that may be merged with the current element.
+      var head = currentSamples.last
+      var i = currentSamples.size - 2
+      // Do not compress the last element
+      while (i >= 1) {
+        // The current sample:
+        val sample1 = currentSamples(i)
+        // Do we need to compress?
+        if (sample1.g + head.g + head.delta < mergeThreshold) {
+          // Do not insert yet, just merge the current element into the head.
+          head = head.copy(g = head.g + sample1.g)
+        } else {
+          // Prepend the current head, and keep the current sample as target for merging.
+          res.prepend(head)
+          head = sample1
+        }
+        i -= 1
+      }
+      res.prepend(head)
+      // If necessary, add the minimum element:
+      res.prepend(currentSamples.head)
+      res
+    }
+  }
+
   /** Calculate the Pearson Correlation Coefficient for the given columns */
   private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
     val counts = collectStatisticalData(df, cols, "correlation")

http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index f01f126..7f92292 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -21,6 +21,8 @@ import java.util.Random
 
 import org.scalatest.Matchers._
 
+import org.apache.spark.Logging
+import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.DoubleType
@@ -123,6 +125,26 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
     assert(math.abs(decimalRes) < 1e-12)
   }
 
+  test("approximate quantile") {
+    val df = Seq.tabulate(1000)(i => (i, 2.0 * i)).toDF("singles", "doubles")
+
+    val expected_1 = 500.0
+    val expected_2 = 1600.0
+
+    val epsilons = List(0.1, 0.05, 0.001)
+
+    for (epsilon <- epsilons) {
+      val result1 = df.stat.approxQuantile("singles", 0.5, epsilon)
+      val result2 = df.stat.approxQuantile("doubles", 0.8, epsilon)
+
+      val error_1 = 2 * 1000 * epsilon
+      val error_2 = 2 * 2000 * epsilon
+
+      assert(math.abs(result1 - expected_1) < error_1)
+      assert(math.abs(result2 - expected_2) < error_2)
+    }
+  }
+
   test("crosstab") {
     val rng = new Random()
     val data = Seq.tabulate(25)(i => (rng.nextInt(5), rng.nextInt(10)))
@@ -269,3 +291,41 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext {
     assert(0.until(1000).forall(i => filter4.mightContain(i * 3)))
   }
 }
+
+
+class DataFrameStatPerfSuite extends QueryTest with SharedSQLContext with Logging {
+
+  // Turn on this test if you want to test the performance of approximate quantiles.
+  ignore("describe() should not be slowed down too much by quantiles") {
+    val df = sqlContext.range(5000000L).toDF("col1").cache()
+    def millis(f: => Any): Double = {
+      // Do some warmup
+      logDebug("warmup...")
+      for (i <- 1 to 10) {
+        df.count()
+        f
+      }
+      logDebug("execute...")
+      // Do it 10 times and report median
+      val times = (1 to 10).map { i =>
+        val start = System.nanoTime()
+        f
+        val end = System.nanoTime()
+        (end - start) / 1e9
+      }
+      logDebug("execute done")
+      times.sum.toDouble / times.length.toDouble
+
+    }
+
+    logDebug("*** Normal describe ***")
+    val t1 = millis { df.describe() }
+    logDebug(s"T1 = $t1")
+    logDebug("*** Just quantiles ***")
+    val t2 = millis {
+      StatFunctions.multipleApproxQuantiles(df, Seq("col1"), Seq(0.1, 0.25, 0.5, 0.75, 0.9), 0.01)
+    }
+    logDebug(s"T1 = $t1, T2 = $t2")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4fd19936/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
new file mode 100644
index 0000000..6992b4c
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.stat
+
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries
+
+
+class ApproxQuantileSuite extends SparkFunSuite {
+
+  private val r = new Random(1)
+  private val n = 100
+  private val increasing = "increasing" -> (0 until n).map(_.toDouble)
+  private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble)
+  private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000))
+
+  private def buildSummary(
+      data: Seq[Double],
+      epsi: Double,
+      threshold: Int): QuantileSummaries = {
+    var summary = new QuantileSummaries(threshold, epsi)
+    data.foreach { x =>
+      summary = summary.insert(x)
+    }
+    summary.compress()
+  }
+
+  private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
+    val approx = summary.query(quant)
+    // The rank of the approximation.
+    val rank = data.count(_ < approx) // has to be <, not <= to be exact
+    val lower = math.floor((quant - summary.epsilon) * data.size)
+    assert(rank >= lower,
+      s"approx_rank: $rank ! >= $lower, requested quantile = $quant")
+    val upper = math.ceil((quant + summary.epsilon) * data.size)
+    assert(rank <= upper,
+      s"approx_rank: $rank ! <= $upper, requested quantile = $quant")
+  }
+
+  for {
+    (seq_name, data) <- Seq(increasing, decreasing, random)
+    epsi <- Seq(0.1, 0.0001)
+    compression <- Seq(1000, 10)
+  } {
+
+    test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s = buildSummary(data, epsi, compression)
+      val min_approx = s.query(0.0)
+      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+      val max_approx = s.query(1.0)
+      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+    }
+
+    test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s = buildSummary(data, epsi, compression)
+      assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
+  }
+
+  // Tests for merging procedure
+  for {
+    (seq_name, data) <- Seq(increasing, decreasing, random)
+    epsi <- Seq(0.1, 0.0001)
+    compression <- Seq(1000, 10)
+  } {
+
+    val (data1, data2) = {
+      val l = data.size
+      data.take(l / 2) -> data.drop(l / 2)
+    }
+
+    test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s1 = buildSummary(data1, epsi, compression)
+      val s2 = buildSummary(data2, epsi, compression)
+      val s = s1.merge(s2)
+      val min_approx = s.query(0.0)
+      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+      val max_approx = s.query(1.0)
+      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
+
+    val (data11, data12) = {
+      data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq
+    }
+
+    test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s1 = buildSummary(data11, epsi, compression)
+      val s2 = buildSummary(data12, epsi, compression)
+      val s = s1.merge(s2)
+      val min_approx = s.query(0.0)
+      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+      val max_approx = s.query(1.0)
+      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org