You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/08/23 06:57:07 UTC

spark git commit: [SPARK-17188][SQL] Moves class QuantileSummaries to project catalyst for implementing percentile_approx

Repository: spark
Updated Branches:
  refs/heads/master d2b3d3e63 -> cc33460a5


[SPARK-17188][SQL] Moves class QuantileSummaries to project catalyst for implementing percentile_approx

## What changes were proposed in this pull request?

This is a sub-task of [SPARK-16283](https://issues.apache.org/jira/browse/SPARK-16283) (Implement percentile_approx SQL function), which moves class QuantileSummaries to project catalyst so that it can be reused when implementing aggregation function `percentile_approx`.

## How was this patch tested?

This PR only does class relocation, class implementation is not changed.

Author: Sean Zhong <se...@databricks.com>

Closes #14754 from clockfly/move_QuantileSummaries_to_catalyst.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc33460a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc33460a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc33460a

Branch: refs/heads/master
Commit: cc33460a51d2890fe8f50f5b6b87003d6d210f04
Parents: d2b3d3e
Author: Sean Zhong <se...@databricks.com>
Authored: Tue Aug 23 14:57:00 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Aug 23 14:57:00 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/util/QuantileSummaries.scala   | 264 +++++++++++++++++++
 .../catalyst/util/QuantileSummariesSuite.scala  | 126 +++++++++
 .../sql/execution/stat/StatFunctions.scala      | 247 +----------------
 .../execution/stat/ApproxQuantileSuite.scala    | 129 ---------
 4 files changed, 391 insertions(+), 375 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
new file mode 100644
index 0000000..493b5fa
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
+
+/**
+ * Helper class to compute approximate quantile summary.
+ * This implementation is based on the algorithm proposed in the paper:
+ * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
+ * and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)
+ *
+ * In order to optimize for speed, it maintains an internal buffer of the last seen samples,
+ * and only inserts them after crossing a certain size threshold. This guarantees a near-constant
+ * runtime complexity compared to the original algorithm.
+ *
+ * @param compressThreshold the compression threshold.
+ *   After the internal buffer of statistics crosses this size, it attempts to compress the
+ *   statistics together.
+ * @param relativeError the target relative error.
+ *   It is uniform across the complete range of values.
+ * @param sampled a buffer of quantile statistics.
+ *   See the G-K article for more details.
+ * @param count the count of all the elements *inserted in the sampled buffer*
+ *              (excluding the head buffer)
+ */
+class QuantileSummaries(
+    val compressThreshold: Int,
+    val relativeError: Double,
+    val sampled: Array[Stats] = Array.empty,
+    val count: Long = 0L) extends Serializable {
+
+  // a buffer of latest samples seen so far
+  private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
+
+  import QuantileSummaries._
+
+  /**
+   * Returns a summary with the given observation inserted into the summary.
+   * This method may either modify in place the current summary (and return the same summary,
+   * modified in place), or it may create a new summary from scratch it necessary.
+   * @param x the new observation to insert into the summary
+   */
+  def insert(x: Double): QuantileSummaries = {
+    headSampled.append(x)
+    if (headSampled.size >= defaultHeadSize) {
+      this.withHeadBufferInserted
+    } else {
+      this
+    }
+  }
+
+  /**
+   * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse
+   * the summary statistics in a single batch.
+   *
+   * This method does not modify the current object and returns if necessary a new copy.
+   *
+   * @return a new quantile summary object.
+   */
+  private def withHeadBufferInserted: QuantileSummaries = {
+    if (headSampled.isEmpty) {
+      return this
+    }
+    var currentCount = count
+    val sorted = headSampled.toArray.sorted
+    val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
+    // The index of the next element to insert
+    var sampleIdx = 0
+    // The index of the sample currently being inserted.
+    var opsIdx: Int = 0
+    while(opsIdx < sorted.length) {
+      val currentSample = sorted(opsIdx)
+      // Add all the samples before the next observation.
+      while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
+        newSamples.append(sampled(sampleIdx))
+        sampleIdx += 1
+      }
+
+      // If it is the first one to insert, of if it is the last one
+      currentCount += 1
+      val delta =
+        if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
+          0
+        } else {
+          math.floor(2 * relativeError * currentCount).toInt
+        }
+
+      val tuple = Stats(currentSample, 1, delta)
+      newSamples.append(tuple)
+      opsIdx += 1
+    }
+
+    // Add all the remaining existing samples
+    while(sampleIdx < sampled.size) {
+      newSamples.append(sampled(sampleIdx))
+      sampleIdx += 1
+    }
+    new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
+  }
+
+  /**
+   * Returns a new summary that compresses the summary statistics and the head buffer.
+   *
+   * This implements the COMPRESS function of the GK algorithm. It does not modify the object.
+   *
+   * @return a new summary object with compressed statistics
+   */
+  def compress(): QuantileSummaries = {
+    // Inserts all the elements first
+    val inserted = this.withHeadBufferInserted
+    assert(inserted.headSampled.isEmpty)
+    assert(inserted.count == count + headSampled.size)
+    val compressed =
+      compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
+    new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
+  }
+
+  private def shallowCopy: QuantileSummaries = {
+    new QuantileSummaries(compressThreshold, relativeError, sampled, count)
+  }
+
+  /**
+   * Merges two (compressed) summaries together.
+   *
+   * Returns a new summary.
+   */
+  def merge(other: QuantileSummaries): QuantileSummaries = {
+    require(headSampled.isEmpty, "Current buffer needs to be compressed before merge")
+    require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge")
+    if (other.count == 0) {
+      this.shallowCopy
+    } else if (count == 0) {
+      other.shallowCopy
+    } else {
+      // Merge the two buffers.
+      // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
+      // statistics during the merging: the invariants are still respected after the merge.
+      // TODO: could replace full sort by ordered merge, the two lists are known to be sorted
+      // already.
+      val res = (sampled ++ other.sampled).sortBy(_.value)
+      val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
+      new QuantileSummaries(
+        other.compressThreshold, other.relativeError, comp, other.count + count)
+    }
+  }
+
+  /**
+   * Runs a query for a given quantile.
+   * The result follows the approximation guarantees detailed above.
+   * The query can only be run on a compressed summary: you need to call compress() before using
+   * it.
+   *
+   * @param quantile the target quantile
+   * @return
+   */
+  def query(quantile: Double): Double = {
+    require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
+    require(headSampled.isEmpty,
+      "Cannot operate on an uncompressed summary, call compress() first")
+
+    if (quantile <= relativeError) {
+      return sampled.head.value
+    }
+
+    if (quantile >= 1 - relativeError) {
+      return sampled.last.value
+    }
+
+    // Target rank
+    val rank = math.ceil(quantile * count).toInt
+    val targetError = math.ceil(relativeError * count)
+    // Minimum rank at current sample
+    var minRank = 0
+    var i = 1
+    while (i < sampled.size - 1) {
+      val curSample = sampled(i)
+      minRank += curSample.g
+      val maxRank = minRank + curSample.delta
+      if (maxRank - targetError <= rank && rank <= minRank + targetError) {
+        return curSample.value
+      }
+      i += 1
+    }
+    sampled.last.value
+  }
+}
+
+object QuantileSummaries {
+  // TODO(tjhunter) more tuning could be done one the constants here, but for now
+  // the main cost of the algorithm is accessing the data in SQL.
+  /**
+   * The default value for the compression threshold.
+   */
+  val defaultCompressThreshold: Int = 10000
+
+  /**
+   * The size of the head buffer.
+   */
+  val defaultHeadSize: Int = 50000
+
+  /**
+   * The default value for the relative error (1%).
+   * With this value, the best extreme percentiles that can be approximated are 1% and 99%.
+   */
+  val defaultRelativeError: Double = 0.01
+
+  /**
+   * Statistics from the Greenwald-Khanna paper.
+   * @param value the sampled value
+   * @param g the minimum rank jump from the previous value's minimum rank
+   * @param delta the maximum span of the rank.
+   */
+  case class Stats(value: Double, g: Int, delta: Int)
+
+  private def compressImmut(
+      currentSamples: IndexedSeq[Stats],
+      mergeThreshold: Double): Array[Stats] = {
+    if (currentSamples.isEmpty) {
+      return Array.empty[Stats]
+    }
+    val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+    // Start for the last element, which is always part of the set.
+    // The head contains the current new head, that may be merged with the current element.
+    var head = currentSamples.last
+    var i = currentSamples.size - 2
+    // Do not compress the last element
+    while (i >= 1) {
+      // The current sample:
+      val sample1 = currentSamples(i)
+      // Do we need to compress?
+      if (sample1.g + head.g + head.delta < mergeThreshold) {
+        // Do not insert yet, just merge the current element into the head.
+        head = head.copy(g = head.g + sample1.g)
+      } else {
+        // Prepend the current head, and keep the current sample as target for merging.
+        res.prepend(head)
+        head = sample1
+      }
+      i -= 1
+    }
+    res.prepend(head)
+    // If necessary, add the minimum element:
+    res.prepend(currentSamples.head)
+    res.toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
new file mode 100644
index 0000000..89b2a22
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class QuantileSummariesSuite extends SparkFunSuite {
+
+  private val r = new Random(1)
+  private val n = 100
+  private val increasing = "increasing" -> (0 until n).map(_.toDouble)
+  private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble)
+  private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000))
+
+  private def buildSummary(
+      data: Seq[Double],
+      epsi: Double,
+      threshold: Int): QuantileSummaries = {
+    var summary = new QuantileSummaries(threshold, epsi)
+    data.foreach { x =>
+      summary = summary.insert(x)
+    }
+    summary.compress()
+  }
+
+  private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
+    val approx = summary.query(quant)
+    // The rank of the approximation.
+    val rank = data.count(_ < approx) // has to be <, not <= to be exact
+    val lower = math.floor((quant - summary.relativeError) * data.size)
+    val upper = math.ceil((quant + summary.relativeError) * data.size)
+    val msg =
+      s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
+    assert(rank >= lower, msg)
+    assert(rank <= upper, msg)
+  }
+
+  for {
+    (seq_name, data) <- Seq(increasing, decreasing, random)
+    epsi <- Seq(0.1, 0.0001)
+    compression <- Seq(1000, 10)
+  } {
+
+    test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s = buildSummary(data, epsi, compression)
+      val min_approx = s.query(0.0)
+      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+      val max_approx = s.query(1.0)
+      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+    }
+
+    test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s = buildSummary(data, epsi, compression)
+      assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
+  }
+
+  // Tests for merging procedure
+  for {
+    (seq_name, data) <- Seq(increasing, decreasing, random)
+    epsi <- Seq(0.1, 0.0001)
+    compression <- Seq(1000, 10)
+  } {
+
+    val (data1, data2) = {
+      val l = data.size
+      data.take(l / 2) -> data.drop(l / 2)
+    }
+
+    test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s1 = buildSummary(data1, epsi, compression)
+      val s2 = buildSummary(data2, epsi, compression)
+      val s = s1.merge(s2)
+      val min_approx = s.query(0.0)
+      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+      val max_approx = s.query(1.0)
+      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
+
+    val (data11, data12) = {
+      data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq
+    }
+
+    test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
+      val s1 = buildSummary(data11, epsi, compression)
+      val s2 = buildSummary(data12, epsi, compression)
+      val s = s1.merge(s2)
+      val min_approx = s.query(0.0)
+      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+      val max_approx = s.query(1.0)
+      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+      checkQuantile(0.9999, data, s)
+      checkQuantile(0.9, data, s)
+      checkQuantile(0.5, data, s)
+      checkQuantile(0.1, data, s)
+      checkQuantile(0.001, data, s)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 7c58c48..822f49e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -17,20 +17,17 @@
 
 package org.apache.spark.sql.execution.stat
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
 import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.util.QuantileSummaries
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 object StatFunctions extends Logging {
 
-  import QuantileSummaries.Stats
-
   /**
    * Calculates the approximate quantiles of multiple numerical columns of a DataFrame in one pass.
    *
@@ -95,248 +92,6 @@ object StatFunctions extends Logging {
     summaries.map { summary => probabilities.map(summary.query) }
   }
 
-  /**
-   * Helper class to compute approximate quantile summary.
-   * This implementation is based on the algorithm proposed in the paper:
-   * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
-   * and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)
-   *
-   * In order to optimize for speed, it maintains an internal buffer of the last seen samples,
-   * and only inserts them after crossing a certain size threshold. This guarantees a near-constant
-   * runtime complexity compared to the original algorithm.
-   *
-   * @param compressThreshold the compression threshold.
-   *   After the internal buffer of statistics crosses this size, it attempts to compress the
-   *   statistics together.
-   * @param relativeError the target relative error.
-   *   It is uniform across the complete range of values.
-   * @param sampled a buffer of quantile statistics.
-   *   See the G-K article for more details.
-   * @param count the count of all the elements *inserted in the sampled buffer*
-   *              (excluding the head buffer)
-   */
-  class QuantileSummaries(
-      val compressThreshold: Int,
-      val relativeError: Double,
-      val sampled: Array[Stats] = Array.empty,
-      val count: Long = 0L) extends Serializable {
-
-    // a buffer of latest samples seen so far
-    private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
-
-    import QuantileSummaries._
-
-    /**
-     * Returns a summary with the given observation inserted into the summary.
-     * This method may either modify in place the current summary (and return the same summary,
-     * modified in place), or it may create a new summary from scratch it necessary.
-     * @param x the new observation to insert into the summary
-     */
-    def insert(x: Double): QuantileSummaries = {
-      headSampled.append(x)
-      if (headSampled.size >= defaultHeadSize) {
-        this.withHeadBufferInserted
-      } else {
-        this
-      }
-    }
-
-    /**
-     * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse
-     * the summary statistics in a single batch.
-     *
-     * This method does not modify the current object and returns if necessary a new copy.
-     *
-     * @return a new quantile summary object.
-     */
-    private def withHeadBufferInserted: QuantileSummaries = {
-      if (headSampled.isEmpty) {
-        return this
-      }
-      var currentCount = count
-      val sorted = headSampled.toArray.sorted
-      val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
-      // The index of the next element to insert
-      var sampleIdx = 0
-      // The index of the sample currently being inserted.
-      var opsIdx: Int = 0
-      while(opsIdx < sorted.length) {
-        val currentSample = sorted(opsIdx)
-        // Add all the samples before the next observation.
-        while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
-          newSamples.append(sampled(sampleIdx))
-          sampleIdx += 1
-        }
-
-        // If it is the first one to insert, of if it is the last one
-        currentCount += 1
-        val delta =
-          if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
-            0
-          } else {
-            math.floor(2 * relativeError * currentCount).toInt
-          }
-
-        val tuple = Stats(currentSample, 1, delta)
-        newSamples.append(tuple)
-        opsIdx += 1
-      }
-
-      // Add all the remaining existing samples
-      while(sampleIdx < sampled.size) {
-        newSamples.append(sampled(sampleIdx))
-        sampleIdx += 1
-      }
-      new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
-    }
-
-    /**
-     * Returns a new summary that compresses the summary statistics and the head buffer.
-     *
-     * This implements the COMPRESS function of the GK algorithm. It does not modify the object.
-     *
-     * @return a new summary object with compressed statistics
-     */
-    def compress(): QuantileSummaries = {
-      // Inserts all the elements first
-      val inserted = this.withHeadBufferInserted
-      assert(inserted.headSampled.isEmpty)
-      assert(inserted.count == count + headSampled.size)
-      val compressed =
-        compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
-      new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
-    }
-
-    private def shallowCopy: QuantileSummaries = {
-      new QuantileSummaries(compressThreshold, relativeError, sampled, count)
-    }
-
-    /**
-     * Merges two (compressed) summaries together.
-     *
-     * Returns a new summary.
-     */
-    def merge(other: QuantileSummaries): QuantileSummaries = {
-      require(headSampled.isEmpty, "Current buffer needs to be compressed before merge")
-      require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge")
-      if (other.count == 0) {
-        this.shallowCopy
-      } else if (count == 0) {
-        other.shallowCopy
-      } else {
-        // Merge the two buffers.
-        // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
-        // statistics during the merging: the invariants are still respected after the merge.
-        // TODO: could replace full sort by ordered merge, the two lists are known to be sorted
-        // already.
-        val res = (sampled ++ other.sampled).sortBy(_.value)
-        val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
-        new QuantileSummaries(
-          other.compressThreshold, other.relativeError, comp, other.count + count)
-      }
-    }
-
-    /**
-     * Runs a query for a given quantile.
-     * The result follows the approximation guarantees detailed above.
-     * The query can only be run on a compressed summary: you need to call compress() before using
-     * it.
-     *
-     * @param quantile the target quantile
-     * @return
-     */
-    def query(quantile: Double): Double = {
-      require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
-      require(headSampled.isEmpty,
-        "Cannot operate on an uncompressed summary, call compress() first")
-
-      if (quantile <= relativeError) {
-        return sampled.head.value
-      }
-
-      if (quantile >= 1 - relativeError) {
-        return sampled.last.value
-      }
-
-      // Target rank
-      val rank = math.ceil(quantile * count).toInt
-      val targetError = math.ceil(relativeError * count)
-      // Minimum rank at current sample
-      var minRank = 0
-      var i = 1
-      while (i < sampled.size - 1) {
-        val curSample = sampled(i)
-        minRank += curSample.g
-        val maxRank = minRank + curSample.delta
-        if (maxRank - targetError <= rank && rank <= minRank + targetError) {
-          return curSample.value
-        }
-        i += 1
-      }
-      sampled.last.value
-    }
-  }
-
-  object QuantileSummaries {
-    // TODO(tjhunter) more tuning could be done one the constants here, but for now
-    // the main cost of the algorithm is accessing the data in SQL.
-    /**
-     * The default value for the compression threshold.
-     */
-    val defaultCompressThreshold: Int = 10000
-
-    /**
-     * The size of the head buffer.
-     */
-    val defaultHeadSize: Int = 50000
-
-    /**
-     * The default value for the relative error (1%).
-     * With this value, the best extreme percentiles that can be approximated are 1% and 99%.
-     */
-    val defaultRelativeError: Double = 0.01
-
-    /**
-     * Statistics from the Greenwald-Khanna paper.
-     * @param value the sampled value
-     * @param g the minimum rank jump from the previous value's minimum rank
-     * @param delta the maximum span of the rank.
-     */
-    case class Stats(value: Double, g: Int, delta: Int)
-
-    private def compressImmut(
-        currentSamples: IndexedSeq[Stats],
-        mergeThreshold: Double): Array[Stats] = {
-      if (currentSamples.isEmpty) {
-        return Array.empty[Stats]
-      }
-      val res: ArrayBuffer[Stats] = ArrayBuffer.empty
-      // Start for the last element, which is always part of the set.
-      // The head contains the current new head, that may be merged with the current element.
-      var head = currentSamples.last
-      var i = currentSamples.size - 2
-      // Do not compress the last element
-      while (i >= 1) {
-        // The current sample:
-        val sample1 = currentSamples(i)
-        // Do we need to compress?
-        if (sample1.g + head.g + head.delta < mergeThreshold) {
-          // Do not insert yet, just merge the current element into the head.
-          head = head.copy(g = head.g + sample1.g)
-        } else {
-          // Prepend the current head, and keep the current sample as target for merging.
-          res.prepend(head)
-          head = sample1
-        }
-        i -= 1
-      }
-      res.prepend(head)
-      // If necessary, add the minimum element:
-      res.prepend(currentSamples.head)
-      res.toArray
-    }
-  }
-
   /** Calculate the Pearson Correlation Coefficient for the given columns */
   def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
     val counts = collectStatisticalData(df, cols, "correlation")

http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
deleted file mode 100644
index 0a989d0..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.stat
-
-import scala.util.Random
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries
-
-
-class ApproxQuantileSuite extends SparkFunSuite {
-
-  private val r = new Random(1)
-  private val n = 100
-  private val increasing = "increasing" -> (0 until n).map(_.toDouble)
-  private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble)
-  private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000))
-
-  private def buildSummary(
-      data: Seq[Double],
-      epsi: Double,
-      threshold: Int): QuantileSummaries = {
-    var summary = new QuantileSummaries(threshold, epsi)
-    data.foreach { x =>
-      summary = summary.insert(x)
-    }
-    summary.compress()
-  }
-
-  private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
-    val approx = summary.query(quant)
-    // The rank of the approximation.
-    val rank = data.count(_ < approx) // has to be <, not <= to be exact
-    val lower = math.floor((quant - summary.relativeError) * data.size)
-    val upper = math.ceil((quant + summary.relativeError) * data.size)
-    val msg =
-      s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
-    assert(rank >= lower, msg)
-    assert(rank <= upper, msg)
-  }
-
-  for {
-    (seq_name, data) <- Seq(increasing, decreasing, random)
-    epsi <- Seq(0.1, 0.0001)
-    compression <- Seq(1000, 10)
-  } {
-
-    test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
-      val s = buildSummary(data, epsi, compression)
-      val min_approx = s.query(0.0)
-      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
-      val max_approx = s.query(1.0)
-      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
-    }
-
-    test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") {
-      val s = buildSummary(data, epsi, compression)
-      assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
-      checkQuantile(0.9999, data, s)
-      checkQuantile(0.9, data, s)
-      checkQuantile(0.5, data, s)
-      checkQuantile(0.1, data, s)
-      checkQuantile(0.001, data, s)
-    }
-  }
-
-  // Tests for merging procedure
-  for {
-    (seq_name, data) <- Seq(increasing, decreasing, random)
-    epsi <- Seq(0.1, 0.0001)
-    compression <- Seq(1000, 10)
-  } {
-
-    val (data1, data2) = {
-      val l = data.size
-      data.take(l / 2) -> data.drop(l / 2)
-    }
-
-    test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
-      val s1 = buildSummary(data1, epsi, compression)
-      val s2 = buildSummary(data2, epsi, compression)
-      val s = s1.merge(s2)
-      val min_approx = s.query(0.0)
-      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
-      val max_approx = s.query(1.0)
-      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
-      checkQuantile(0.9999, data, s)
-      checkQuantile(0.9, data, s)
-      checkQuantile(0.5, data, s)
-      checkQuantile(0.1, data, s)
-      checkQuantile(0.001, data, s)
-    }
-
-    val (data11, data12) = {
-      data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq
-    }
-
-    test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
-      val s1 = buildSummary(data11, epsi, compression)
-      val s2 = buildSummary(data12, epsi, compression)
-      val s = s1.merge(s2)
-      val min_approx = s.query(0.0)
-      assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
-      val max_approx = s.query(1.0)
-      assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
-      checkQuantile(0.9999, data, s)
-      checkQuantile(0.9, data, s)
-      checkQuantile(0.5, data, s)
-      checkQuantile(0.1, data, s)
-      checkQuantile(0.001, data, s)
-    }
-  }
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org