You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/08/23 06:57:07 UTC
spark git commit: [SPARK-17188][SQL] Moves class QuantileSummaries to
project catalyst for implementing percentile_approx
Repository: spark
Updated Branches:
refs/heads/master d2b3d3e63 -> cc33460a5
[SPARK-17188][SQL] Moves class QuantileSummaries to project catalyst for implementing percentile_approx
## What changes were proposed in this pull request?
This is a sub-task of [SPARK-16283](https://issues.apache.org/jira/browse/SPARK-16283) (Implement percentile_approx SQL function), which moves class QuantileSummaries to project catalyst so that it can be reused when implementing aggregation function `percentile_approx`.
## How was this patch tested?
This PR only does class relocation, class implementation is not changed.
Author: Sean Zhong <se...@databricks.com>
Closes #14754 from clockfly/move_QuantileSummaries_to_catalyst.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc33460a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc33460a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc33460a
Branch: refs/heads/master
Commit: cc33460a51d2890fe8f50f5b6b87003d6d210f04
Parents: d2b3d3e
Author: Sean Zhong <se...@databricks.com>
Authored: Tue Aug 23 14:57:00 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Aug 23 14:57:00 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/util/QuantileSummaries.scala | 264 +++++++++++++++++++
.../catalyst/util/QuantileSummariesSuite.scala | 126 +++++++++
.../sql/execution/stat/StatFunctions.scala | 247 +----------------
.../execution/stat/ApproxQuantileSuite.scala | 129 ---------
4 files changed, 391 insertions(+), 375 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
new file mode 100644
index 0000000..493b5fa
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
+
+/**
+ * Helper class to compute approximate quantile summary.
+ * This implementation is based on the algorithm proposed in the paper:
+ * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
+ * and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)
+ *
+ * In order to optimize for speed, it maintains an internal buffer of the last seen samples,
+ * and only inserts them after crossing a certain size threshold. This guarantees a near-constant
+ * runtime complexity compared to the original algorithm.
+ *
+ * @param compressThreshold the compression threshold.
+ * After the internal buffer of statistics crosses this size, it attempts to compress the
+ * statistics together.
+ * @param relativeError the target relative error.
+ * It is uniform across the complete range of values.
+ * @param sampled a buffer of quantile statistics.
+ * See the G-K article for more details.
+ * @param count the count of all the elements *inserted in the sampled buffer*
+ * (excluding the head buffer)
+ */
+class QuantileSummaries(
+ val compressThreshold: Int,
+ val relativeError: Double,
+ val sampled: Array[Stats] = Array.empty,
+ val count: Long = 0L) extends Serializable {
+
+ // a buffer of latest samples seen so far
+ private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
+
+ import QuantileSummaries._
+
+ /**
+ * Returns a summary with the given observation inserted into the summary.
+ * This method may either modify in place the current summary (and return the same summary,
+ * modified in place), or it may create a new summary from scratch it necessary.
+ * @param x the new observation to insert into the summary
+ */
+ def insert(x: Double): QuantileSummaries = {
+ headSampled.append(x)
+ if (headSampled.size >= defaultHeadSize) {
+ this.withHeadBufferInserted
+ } else {
+ this
+ }
+ }
+
+ /**
+ * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse
+ * the summary statistics in a single batch.
+ *
+ * This method does not modify the current object and returns if necessary a new copy.
+ *
+ * @return a new quantile summary object.
+ */
+ private def withHeadBufferInserted: QuantileSummaries = {
+ if (headSampled.isEmpty) {
+ return this
+ }
+ var currentCount = count
+ val sorted = headSampled.toArray.sorted
+ val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
+ // The index of the next element to insert
+ var sampleIdx = 0
+ // The index of the sample currently being inserted.
+ var opsIdx: Int = 0
+ while(opsIdx < sorted.length) {
+ val currentSample = sorted(opsIdx)
+ // Add all the samples before the next observation.
+ while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
+ newSamples.append(sampled(sampleIdx))
+ sampleIdx += 1
+ }
+
+ // If it is the first one to insert, of if it is the last one
+ currentCount += 1
+ val delta =
+ if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
+ 0
+ } else {
+ math.floor(2 * relativeError * currentCount).toInt
+ }
+
+ val tuple = Stats(currentSample, 1, delta)
+ newSamples.append(tuple)
+ opsIdx += 1
+ }
+
+ // Add all the remaining existing samples
+ while(sampleIdx < sampled.size) {
+ newSamples.append(sampled(sampleIdx))
+ sampleIdx += 1
+ }
+ new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
+ }
+
+ /**
+ * Returns a new summary that compresses the summary statistics and the head buffer.
+ *
+ * This implements the COMPRESS function of the GK algorithm. It does not modify the object.
+ *
+ * @return a new summary object with compressed statistics
+ */
+ def compress(): QuantileSummaries = {
+ // Inserts all the elements first
+ val inserted = this.withHeadBufferInserted
+ assert(inserted.headSampled.isEmpty)
+ assert(inserted.count == count + headSampled.size)
+ val compressed =
+ compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
+ new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
+ }
+
+ private def shallowCopy: QuantileSummaries = {
+ new QuantileSummaries(compressThreshold, relativeError, sampled, count)
+ }
+
+ /**
+ * Merges two (compressed) summaries together.
+ *
+ * Returns a new summary.
+ */
+ def merge(other: QuantileSummaries): QuantileSummaries = {
+ require(headSampled.isEmpty, "Current buffer needs to be compressed before merge")
+ require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge")
+ if (other.count == 0) {
+ this.shallowCopy
+ } else if (count == 0) {
+ other.shallowCopy
+ } else {
+ // Merge the two buffers.
+ // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
+ // statistics during the merging: the invariants are still respected after the merge.
+ // TODO: could replace full sort by ordered merge, the two lists are known to be sorted
+ // already.
+ val res = (sampled ++ other.sampled).sortBy(_.value)
+ val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
+ new QuantileSummaries(
+ other.compressThreshold, other.relativeError, comp, other.count + count)
+ }
+ }
+
+ /**
+ * Runs a query for a given quantile.
+ * The result follows the approximation guarantees detailed above.
+ * The query can only be run on a compressed summary: you need to call compress() before using
+ * it.
+ *
+ * @param quantile the target quantile
+ * @return
+ */
+ def query(quantile: Double): Double = {
+ require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
+ require(headSampled.isEmpty,
+ "Cannot operate on an uncompressed summary, call compress() first")
+
+ if (quantile <= relativeError) {
+ return sampled.head.value
+ }
+
+ if (quantile >= 1 - relativeError) {
+ return sampled.last.value
+ }
+
+ // Target rank
+ val rank = math.ceil(quantile * count).toInt
+ val targetError = math.ceil(relativeError * count)
+ // Minimum rank at current sample
+ var minRank = 0
+ var i = 1
+ while (i < sampled.size - 1) {
+ val curSample = sampled(i)
+ minRank += curSample.g
+ val maxRank = minRank + curSample.delta
+ if (maxRank - targetError <= rank && rank <= minRank + targetError) {
+ return curSample.value
+ }
+ i += 1
+ }
+ sampled.last.value
+ }
+}
+
+object QuantileSummaries {
+ // TODO(tjhunter) more tuning could be done one the constants here, but for now
+ // the main cost of the algorithm is accessing the data in SQL.
+ /**
+ * The default value for the compression threshold.
+ */
+ val defaultCompressThreshold: Int = 10000
+
+ /**
+ * The size of the head buffer.
+ */
+ val defaultHeadSize: Int = 50000
+
+ /**
+ * The default value for the relative error (1%).
+ * With this value, the best extreme percentiles that can be approximated are 1% and 99%.
+ */
+ val defaultRelativeError: Double = 0.01
+
+ /**
+ * Statistics from the Greenwald-Khanna paper.
+ * @param value the sampled value
+ * @param g the minimum rank jump from the previous value's minimum rank
+ * @param delta the maximum span of the rank.
+ */
+ case class Stats(value: Double, g: Int, delta: Int)
+
+ private def compressImmut(
+ currentSamples: IndexedSeq[Stats],
+ mergeThreshold: Double): Array[Stats] = {
+ if (currentSamples.isEmpty) {
+ return Array.empty[Stats]
+ }
+ val res: ArrayBuffer[Stats] = ArrayBuffer.empty
+ // Start for the last element, which is always part of the set.
+ // The head contains the current new head, that may be merged with the current element.
+ var head = currentSamples.last
+ var i = currentSamples.size - 2
+ // Do not compress the last element
+ while (i >= 1) {
+ // The current sample:
+ val sample1 = currentSamples(i)
+ // Do we need to compress?
+ if (sample1.g + head.g + head.delta < mergeThreshold) {
+ // Do not insert yet, just merge the current element into the head.
+ head = head.copy(g = head.g + sample1.g)
+ } else {
+ // Prepend the current head, and keep the current sample as target for merging.
+ res.prepend(head)
+ head = sample1
+ }
+ i -= 1
+ }
+ res.prepend(head)
+ // If necessary, add the minimum element:
+ res.prepend(currentSamples.head)
+ res.toArray
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
new file mode 100644
index 0000000..89b2a22
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+
+class QuantileSummariesSuite extends SparkFunSuite {
+
+ private val r = new Random(1)
+ private val n = 100
+ private val increasing = "increasing" -> (0 until n).map(_.toDouble)
+ private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble)
+ private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000))
+
+ private def buildSummary(
+ data: Seq[Double],
+ epsi: Double,
+ threshold: Int): QuantileSummaries = {
+ var summary = new QuantileSummaries(threshold, epsi)
+ data.foreach { x =>
+ summary = summary.insert(x)
+ }
+ summary.compress()
+ }
+
+ private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
+ val approx = summary.query(quant)
+ // The rank of the approximation.
+ val rank = data.count(_ < approx) // has to be <, not <= to be exact
+ val lower = math.floor((quant - summary.relativeError) * data.size)
+ val upper = math.ceil((quant + summary.relativeError) * data.size)
+ val msg =
+ s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
+ assert(rank >= lower, msg)
+ assert(rank <= upper, msg)
+ }
+
+ for {
+ (seq_name, data) <- Seq(increasing, decreasing, random)
+ epsi <- Seq(0.1, 0.0001)
+ compression <- Seq(1000, 10)
+ } {
+
+ test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
+ val s = buildSummary(data, epsi, compression)
+ val min_approx = s.query(0.0)
+ assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+ val max_approx = s.query(1.0)
+ assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+ }
+
+ test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") {
+ val s = buildSummary(data, epsi, compression)
+ assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
+ checkQuantile(0.9999, data, s)
+ checkQuantile(0.9, data, s)
+ checkQuantile(0.5, data, s)
+ checkQuantile(0.1, data, s)
+ checkQuantile(0.001, data, s)
+ }
+ }
+
+ // Tests for merging procedure
+ for {
+ (seq_name, data) <- Seq(increasing, decreasing, random)
+ epsi <- Seq(0.1, 0.0001)
+ compression <- Seq(1000, 10)
+ } {
+
+ val (data1, data2) = {
+ val l = data.size
+ data.take(l / 2) -> data.drop(l / 2)
+ }
+
+ test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
+ val s1 = buildSummary(data1, epsi, compression)
+ val s2 = buildSummary(data2, epsi, compression)
+ val s = s1.merge(s2)
+ val min_approx = s.query(0.0)
+ assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+ val max_approx = s.query(1.0)
+ assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+ checkQuantile(0.9999, data, s)
+ checkQuantile(0.9, data, s)
+ checkQuantile(0.5, data, s)
+ checkQuantile(0.1, data, s)
+ checkQuantile(0.001, data, s)
+ }
+
+ val (data11, data12) = {
+ data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq
+ }
+
+ test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
+ val s1 = buildSummary(data11, epsi, compression)
+ val s2 = buildSummary(data12, epsi, compression)
+ val s = s1.merge(s2)
+ val min_approx = s.query(0.0)
+ assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
+ val max_approx = s.query(1.0)
+ assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
+ checkQuantile(0.9999, data, s)
+ checkQuantile(0.9, data, s)
+ checkQuantile(0.5, data, s)
+ checkQuantile(0.1, data, s)
+ checkQuantile(0.001, data, s)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
index 7c58c48..822f49e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala
@@ -17,20 +17,17 @@
package org.apache.spark.sql.execution.stat
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.catalyst.util.QuantileSummaries
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
object StatFunctions extends Logging {
- import QuantileSummaries.Stats
-
/**
* Calculates the approximate quantiles of multiple numerical columns of a DataFrame in one pass.
*
@@ -95,248 +92,6 @@ object StatFunctions extends Logging {
summaries.map { summary => probabilities.map(summary.query) }
}
- /**
- * Helper class to compute approximate quantile summary.
- * This implementation is based on the algorithm proposed in the paper:
- * "Space-efficient Online Computation of Quantile Summaries" by Greenwald, Michael
- * and Khanna, Sanjeev. (http://dx.doi.org/10.1145/375663.375670)
- *
- * In order to optimize for speed, it maintains an internal buffer of the last seen samples,
- * and only inserts them after crossing a certain size threshold. This guarantees a near-constant
- * runtime complexity compared to the original algorithm.
- *
- * @param compressThreshold the compression threshold.
- * After the internal buffer of statistics crosses this size, it attempts to compress the
- * statistics together.
- * @param relativeError the target relative error.
- * It is uniform across the complete range of values.
- * @param sampled a buffer of quantile statistics.
- * See the G-K article for more details.
- * @param count the count of all the elements *inserted in the sampled buffer*
- * (excluding the head buffer)
- */
- class QuantileSummaries(
- val compressThreshold: Int,
- val relativeError: Double,
- val sampled: Array[Stats] = Array.empty,
- val count: Long = 0L) extends Serializable {
-
- // a buffer of latest samples seen so far
- private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
-
- import QuantileSummaries._
-
- /**
- * Returns a summary with the given observation inserted into the summary.
- * This method may either modify in place the current summary (and return the same summary,
- * modified in place), or it may create a new summary from scratch it necessary.
- * @param x the new observation to insert into the summary
- */
- def insert(x: Double): QuantileSummaries = {
- headSampled.append(x)
- if (headSampled.size >= defaultHeadSize) {
- this.withHeadBufferInserted
- } else {
- this
- }
- }
-
- /**
- * Inserts an array of (unsorted samples) in a batch, sorting the array first to traverse
- * the summary statistics in a single batch.
- *
- * This method does not modify the current object and returns if necessary a new copy.
- *
- * @return a new quantile summary object.
- */
- private def withHeadBufferInserted: QuantileSummaries = {
- if (headSampled.isEmpty) {
- return this
- }
- var currentCount = count
- val sorted = headSampled.toArray.sorted
- val newSamples: ArrayBuffer[Stats] = new ArrayBuffer[Stats]()
- // The index of the next element to insert
- var sampleIdx = 0
- // The index of the sample currently being inserted.
- var opsIdx: Int = 0
- while(opsIdx < sorted.length) {
- val currentSample = sorted(opsIdx)
- // Add all the samples before the next observation.
- while(sampleIdx < sampled.size && sampled(sampleIdx).value <= currentSample) {
- newSamples.append(sampled(sampleIdx))
- sampleIdx += 1
- }
-
- // If it is the first one to insert, of if it is the last one
- currentCount += 1
- val delta =
- if (newSamples.isEmpty || (sampleIdx == sampled.size && opsIdx == sorted.length - 1)) {
- 0
- } else {
- math.floor(2 * relativeError * currentCount).toInt
- }
-
- val tuple = Stats(currentSample, 1, delta)
- newSamples.append(tuple)
- opsIdx += 1
- }
-
- // Add all the remaining existing samples
- while(sampleIdx < sampled.size) {
- newSamples.append(sampled(sampleIdx))
- sampleIdx += 1
- }
- new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
- }
-
- /**
- * Returns a new summary that compresses the summary statistics and the head buffer.
- *
- * This implements the COMPRESS function of the GK algorithm. It does not modify the object.
- *
- * @return a new summary object with compressed statistics
- */
- def compress(): QuantileSummaries = {
- // Inserts all the elements first
- val inserted = this.withHeadBufferInserted
- assert(inserted.headSampled.isEmpty)
- assert(inserted.count == count + headSampled.size)
- val compressed =
- compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
- new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
- }
-
- private def shallowCopy: QuantileSummaries = {
- new QuantileSummaries(compressThreshold, relativeError, sampled, count)
- }
-
- /**
- * Merges two (compressed) summaries together.
- *
- * Returns a new summary.
- */
- def merge(other: QuantileSummaries): QuantileSummaries = {
- require(headSampled.isEmpty, "Current buffer needs to be compressed before merge")
- require(other.headSampled.isEmpty, "Other buffer needs to be compressed before merge")
- if (other.count == 0) {
- this.shallowCopy
- } else if (count == 0) {
- other.shallowCopy
- } else {
- // Merge the two buffers.
- // The GK algorithm is a bit unclear about it, but it seems there is no need to adjust the
- // statistics during the merging: the invariants are still respected after the merge.
- // TODO: could replace full sort by ordered merge, the two lists are known to be sorted
- // already.
- val res = (sampled ++ other.sampled).sortBy(_.value)
- val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
- new QuantileSummaries(
- other.compressThreshold, other.relativeError, comp, other.count + count)
- }
- }
-
- /**
- * Runs a query for a given quantile.
- * The result follows the approximation guarantees detailed above.
- * The query can only be run on a compressed summary: you need to call compress() before using
- * it.
- *
- * @param quantile the target quantile
- * @return
- */
- def query(quantile: Double): Double = {
- require(quantile >= 0 && quantile <= 1.0, "quantile should be in the range [0.0, 1.0]")
- require(headSampled.isEmpty,
- "Cannot operate on an uncompressed summary, call compress() first")
-
- if (quantile <= relativeError) {
- return sampled.head.value
- }
-
- if (quantile >= 1 - relativeError) {
- return sampled.last.value
- }
-
- // Target rank
- val rank = math.ceil(quantile * count).toInt
- val targetError = math.ceil(relativeError * count)
- // Minimum rank at current sample
- var minRank = 0
- var i = 1
- while (i < sampled.size - 1) {
- val curSample = sampled(i)
- minRank += curSample.g
- val maxRank = minRank + curSample.delta
- if (maxRank - targetError <= rank && rank <= minRank + targetError) {
- return curSample.value
- }
- i += 1
- }
- sampled.last.value
- }
- }
-
- object QuantileSummaries {
- // TODO(tjhunter) more tuning could be done one the constants here, but for now
- // the main cost of the algorithm is accessing the data in SQL.
- /**
- * The default value for the compression threshold.
- */
- val defaultCompressThreshold: Int = 10000
-
- /**
- * The size of the head buffer.
- */
- val defaultHeadSize: Int = 50000
-
- /**
- * The default value for the relative error (1%).
- * With this value, the best extreme percentiles that can be approximated are 1% and 99%.
- */
- val defaultRelativeError: Double = 0.01
-
- /**
- * Statistics from the Greenwald-Khanna paper.
- * @param value the sampled value
- * @param g the minimum rank jump from the previous value's minimum rank
- * @param delta the maximum span of the rank.
- */
- case class Stats(value: Double, g: Int, delta: Int)
-
- private def compressImmut(
- currentSamples: IndexedSeq[Stats],
- mergeThreshold: Double): Array[Stats] = {
- if (currentSamples.isEmpty) {
- return Array.empty[Stats]
- }
- val res: ArrayBuffer[Stats] = ArrayBuffer.empty
- // Start for the last element, which is always part of the set.
- // The head contains the current new head, that may be merged with the current element.
- var head = currentSamples.last
- var i = currentSamples.size - 2
- // Do not compress the last element
- while (i >= 1) {
- // The current sample:
- val sample1 = currentSamples(i)
- // Do we need to compress?
- if (sample1.g + head.g + head.delta < mergeThreshold) {
- // Do not insert yet, just merge the current element into the head.
- head = head.copy(g = head.g + sample1.g)
- } else {
- // Prepend the current head, and keep the current sample as target for merging.
- res.prepend(head)
- head = sample1
- }
- i -= 1
- }
- res.prepend(head)
- // If necessary, add the minimum element:
- res.prepend(currentSamples.head)
- res.toArray
- }
- }
-
/** Calculate the Pearson Correlation Coefficient for the given columns */
def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
val counts = collectStatisticalData(df, cols, "correlation")
http://git-wip-us.apache.org/repos/asf/spark/blob/cc33460a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
deleted file mode 100644
index 0a989d0..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.stat
-
-import scala.util.Random
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.stat.StatFunctions.QuantileSummaries
-
-
-class ApproxQuantileSuite extends SparkFunSuite {
-
- private val r = new Random(1)
- private val n = 100
- private val increasing = "increasing" -> (0 until n).map(_.toDouble)
- private val decreasing = "decreasing" -> (n until 0 by -1).map(_.toDouble)
- private val random = "random" -> Seq.fill(n)(math.ceil(r.nextDouble() * 1000))
-
- private def buildSummary(
- data: Seq[Double],
- epsi: Double,
- threshold: Int): QuantileSummaries = {
- var summary = new QuantileSummaries(threshold, epsi)
- data.foreach { x =>
- summary = summary.insert(x)
- }
- summary.compress()
- }
-
- private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
- val approx = summary.query(quant)
- // The rank of the approximation.
- val rank = data.count(_ < approx) // has to be <, not <= to be exact
- val lower = math.floor((quant - summary.relativeError) * data.size)
- val upper = math.ceil((quant + summary.relativeError) * data.size)
- val msg =
- s"$rank not in [$lower $upper], requested quantile: $quant, approx returned: $approx"
- assert(rank >= lower, msg)
- assert(rank <= upper, msg)
- }
-
- for {
- (seq_name, data) <- Seq(increasing, decreasing, random)
- epsi <- Seq(0.1, 0.0001)
- compression <- Seq(1000, 10)
- } {
-
- test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
- val s = buildSummary(data, epsi, compression)
- val min_approx = s.query(0.0)
- assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
- val max_approx = s.query(1.0)
- assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
- }
-
- test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression") {
- val s = buildSummary(data, epsi, compression)
- assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
- checkQuantile(0.9999, data, s)
- checkQuantile(0.9, data, s)
- checkQuantile(0.5, data, s)
- checkQuantile(0.1, data, s)
- checkQuantile(0.001, data, s)
- }
- }
-
- // Tests for merging procedure
- for {
- (seq_name, data) <- Seq(increasing, decreasing, random)
- epsi <- Seq(0.1, 0.0001)
- compression <- Seq(1000, 10)
- } {
-
- val (data1, data2) = {
- val l = data.size
- data.take(l / 2) -> data.drop(l / 2)
- }
-
- test(s"Merging ordered lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
- val s1 = buildSummary(data1, epsi, compression)
- val s2 = buildSummary(data2, epsi, compression)
- val s = s1.merge(s2)
- val min_approx = s.query(0.0)
- assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
- val max_approx = s.query(1.0)
- assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
- checkQuantile(0.9999, data, s)
- checkQuantile(0.9, data, s)
- checkQuantile(0.5, data, s)
- checkQuantile(0.1, data, s)
- checkQuantile(0.001, data, s)
- }
-
- val (data11, data12) = {
- data.sliding(2).map(_.head).toSeq -> data.sliding(2).map(_.last).toSeq
- }
-
- test(s"Merging interleaved lists with epsi=$epsi and seq=$seq_name, compression=$compression") {
- val s1 = buildSummary(data11, epsi, compression)
- val s2 = buildSummary(data12, epsi, compression)
- val s = s1.merge(s2)
- val min_approx = s.query(0.0)
- assert(min_approx == data.min, s"Did not return the min: min=${data.min}, got $min_approx")
- val max_approx = s.query(1.0)
- assert(max_approx == data.max, s"Did not return the max: max=${data.max}, got $max_approx")
- checkQuantile(0.9999, data, s)
- checkQuantile(0.9, data, s)
- checkQuantile(0.5, data, s)
- checkQuantile(0.1, data, s)
- checkQuantile(0.001, data, s)
- }
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org