You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/02 18:58:58 UTC
spark git commit: [SPARK-24013][SQL] Remove unneeded compress in
ApproximatePercentile
Repository: spark
Updated Branches:
refs/heads/master 152eaf6ae -> 8dbf56c05
[SPARK-24013][SQL] Remove unneeded compress in ApproximatePercentile
## What changes were proposed in this pull request?
`ApproximatePercentile` contains a workaround logic to compress the samples since at the beginning `QuantileSummaries` was ignoring the compression threshold. This problem was fixed in SPARK-17439, but the workaround logic was not removed. So we are compressing the samples many more times than needed: this could lead to critical performance degradation.
This can create serious performance issues in queries like:
```
select approx_percentile(id, array(0.1)) from range(10000000)
```
## How was this patch tested?
added UT
Author: Marco Gaido <ma...@gmail.com>
Closes #21133 from mgaido91/SPARK-24013.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8dbf56c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8dbf56c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8dbf56c0
Branch: refs/heads/master
Commit: 8dbf56c055218ff0f3fabae84b63c022f43afbfd
Parents: 152eaf6
Author: Marco Gaido <ma...@gmail.com>
Authored: Wed May 2 11:58:55 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed May 2 11:58:55 2018 -0700
----------------------------------------------------------------------
.../aggregate/ApproximatePercentile.scala | 33 ++++----------------
.../sql/catalyst/util/QuantileSummaries.scala | 11 ++++---
.../sql/ApproximatePercentileQuerySuite.scala | 13 ++++++++
3 files changed, 26 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8dbf56c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
index a45854a..f1bbbda 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
@@ -206,27 +206,15 @@ object ApproximatePercentile {
* with limited memory. PercentileDigest is backed by [[QuantileSummaries]].
*
* @param summaries underlying probabilistic data structure [[QuantileSummaries]].
- * @param isCompressed An internal flag from class [[QuantileSummaries]] to indicate whether the
- * underlying quantileSummaries is compressed.
*/
- class PercentileDigest(
- private var summaries: QuantileSummaries,
- private var isCompressed: Boolean) {
-
- // Trigger compression if the QuantileSummaries's buffer length exceeds
- // compressThresHoldBufferLength. The buffer length can be get by
- // quantileSummaries.sampled.length
- private[this] final val compressThresHoldBufferLength: Int = {
- // Max buffer length after compression.
- val maxBufferLengthAfterCompression: Int = (1 / summaries.relativeError).toInt * 2
- // A safe upper bound for buffer length before compression
- maxBufferLengthAfterCompression * 2
- }
+ class PercentileDigest(private var summaries: QuantileSummaries) {
def this(relativeError: Double) = {
- this(new QuantileSummaries(defaultCompressThreshold, relativeError), isCompressed = true)
+ this(new QuantileSummaries(defaultCompressThreshold, relativeError, compressed = true))
}
+ private[sql] def isCompressed: Boolean = summaries.compressed
+
/** Returns compressed object of [[QuantileSummaries]] */
def quantileSummaries: QuantileSummaries = {
if (!isCompressed) compress()
@@ -236,14 +224,6 @@ object ApproximatePercentile {
/** Insert an observation value into the PercentileDigest data structure. */
def add(value: Double): Unit = {
summaries = summaries.insert(value)
- // The result of QuantileSummaries.insert is un-compressed
- isCompressed = false
-
- // Currently, QuantileSummaries ignores the construction parameter compressThresHold,
- // which may cause QuantileSummaries to occupy unbounded memory. We have to hack around here
- // to make sure QuantileSummaries doesn't occupy infinite memory.
- // TODO: Figure out why QuantileSummaries ignores construction parameter compressThresHold
- if (summaries.sampled.length >= compressThresHoldBufferLength) compress()
}
/** In-place merges in another PercentileDigest. */
@@ -280,7 +260,6 @@ object ApproximatePercentile {
private final def compress(): Unit = {
summaries = summaries.compress()
- isCompressed = true
}
}
@@ -335,8 +314,8 @@ object ApproximatePercentile {
sampled(i) = Stats(value, g, delta)
i += 1
}
- val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count)
- new PercentileDigest(summary, isCompressed = true)
+ val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count, true)
+ new PercentileDigest(summary)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8dbf56c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index b013add..3190e51 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -40,12 +40,14 @@ import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
* See the G-K article for more details.
* @param count the count of all the elements *inserted in the sampled buffer*
* (excluding the head buffer)
+ * @param compressed whether the statistics have been compressed
*/
class QuantileSummaries(
val compressThreshold: Int,
val relativeError: Double,
val sampled: Array[Stats] = Array.empty,
- val count: Long = 0L) extends Serializable {
+ val count: Long = 0L,
+ var compressed: Boolean = false) extends Serializable {
// a buffer of latest samples seen so far
private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
@@ -60,6 +62,7 @@ class QuantileSummaries(
*/
def insert(x: Double): QuantileSummaries = {
headSampled += x
+ compressed = false
if (headSampled.size >= defaultHeadSize) {
val result = this.withHeadBufferInserted
if (result.sampled.length >= compressThreshold) {
@@ -135,11 +138,11 @@ class QuantileSummaries(
assert(inserted.count == count + headSampled.size)
val compressed =
compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
- new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
+ new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count, true)
}
private def shallowCopy: QuantileSummaries = {
- new QuantileSummaries(compressThreshold, relativeError, sampled, count)
+ new QuantileSummaries(compressThreshold, relativeError, sampled, count, compressed)
}
/**
@@ -163,7 +166,7 @@ class QuantileSummaries(
val res = (sampled ++ other.sampled).sortBy(_.value)
val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
new QuantileSummaries(
- other.compressThreshold, other.relativeError, comp, other.count + count)
+ other.compressThreshold, other.relativeError, comp, other.count + count, true)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8dbf56c0/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
index 137c5be..d635912 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.sql.{Date, Timestamp}
+import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.PercentileDigest
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -279,4 +280,16 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(query, expected)
}
}
+
+ test("SPARK-24013: unneeded compress can cause performance issues with sorted input") {
+ val buffer = new PercentileDigest(1.0D / ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
+ var compressCounts = 0
+ (1 to 10000000).foreach { i =>
+ buffer.add(i)
+ if (buffer.isCompressed) compressCounts += 1
+ }
+ assert(compressCounts > 0)
+ buffer.quantileSummaries
+ assert(buffer.isCompressed)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org