You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/02 18:58:58 UTC

spark git commit: [SPARK-24013][SQL] Remove unneeded compress in ApproximatePercentile

Repository: spark
Updated Branches:
  refs/heads/master 152eaf6ae -> 8dbf56c05


[SPARK-24013][SQL] Remove unneeded compress in ApproximatePercentile

## What changes were proposed in this pull request?

`ApproximatePercentile` contains a workaround logic to compress the samples since at the beginning `QuantileSummaries` was ignoring the compression threshold. This problem was fixed in SPARK-17439, but the workaround logic was not removed. So we are compressing the samples many more times than needed: this could lead to critical performance degradation.

This can create serious performance issues in queries like:
```
select approx_percentile(id, array(0.1)) from range(10000000)
```

## How was this patch tested?

added UT

Author: Marco Gaido <ma...@gmail.com>

Closes #21133 from mgaido91/SPARK-24013.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8dbf56c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8dbf56c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8dbf56c0

Branch: refs/heads/master
Commit: 8dbf56c055218ff0f3fabae84b63c022f43afbfd
Parents: 152eaf6
Author: Marco Gaido <ma...@gmail.com>
Authored: Wed May 2 11:58:55 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed May 2 11:58:55 2018 -0700

----------------------------------------------------------------------
 .../aggregate/ApproximatePercentile.scala       | 33 ++++----------------
 .../sql/catalyst/util/QuantileSummaries.scala   | 11 ++++---
 .../sql/ApproximatePercentileQuerySuite.scala   | 13 ++++++++
 3 files changed, 26 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8dbf56c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
index a45854a..f1bbbda 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
@@ -206,27 +206,15 @@ object ApproximatePercentile {
    * with limited memory. PercentileDigest is backed by [[QuantileSummaries]].
    *
    * @param summaries underlying probabilistic data structure [[QuantileSummaries]].
-   * @param isCompressed An internal flag from class [[QuantileSummaries]] to indicate whether the
-   *                   underlying quantileSummaries is compressed.
    */
-  class PercentileDigest(
-      private var summaries: QuantileSummaries,
-      private var isCompressed: Boolean) {
-
-    // Trigger compression if the QuantileSummaries's buffer length exceeds
-    // compressThresHoldBufferLength. The buffer length can be get by
-    // quantileSummaries.sampled.length
-    private[this] final val compressThresHoldBufferLength: Int = {
-      // Max buffer length after compression.
-      val maxBufferLengthAfterCompression: Int = (1 / summaries.relativeError).toInt * 2
-      // A safe upper bound for buffer length before compression
-      maxBufferLengthAfterCompression * 2
-    }
+  class PercentileDigest(private var summaries: QuantileSummaries) {
 
     def this(relativeError: Double) = {
-      this(new QuantileSummaries(defaultCompressThreshold, relativeError), isCompressed = true)
+      this(new QuantileSummaries(defaultCompressThreshold, relativeError, compressed = true))
     }
 
+    private[sql] def isCompressed: Boolean = summaries.compressed
+
     /** Returns compressed object of [[QuantileSummaries]] */
     def quantileSummaries: QuantileSummaries = {
       if (!isCompressed) compress()
@@ -236,14 +224,6 @@ object ApproximatePercentile {
     /** Insert an observation value into the PercentileDigest data structure. */
     def add(value: Double): Unit = {
       summaries = summaries.insert(value)
-      // The result of QuantileSummaries.insert is un-compressed
-      isCompressed = false
-
-      // Currently, QuantileSummaries ignores the construction parameter compressThresHold,
-      // which may cause QuantileSummaries to occupy unbounded memory. We have to hack around here
-      // to make sure QuantileSummaries doesn't occupy infinite memory.
-      // TODO: Figure out why QuantileSummaries ignores construction parameter compressThresHold
-      if (summaries.sampled.length >= compressThresHoldBufferLength) compress()
     }
 
     /** In-place merges in another PercentileDigest. */
@@ -280,7 +260,6 @@ object ApproximatePercentile {
 
     private final def compress(): Unit = {
       summaries = summaries.compress()
-      isCompressed = true
     }
   }
 
@@ -335,8 +314,8 @@ object ApproximatePercentile {
         sampled(i) = Stats(value, g, delta)
         i += 1
       }
-      val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count)
-      new PercentileDigest(summary, isCompressed = true)
+      val summary = new QuantileSummaries(compressThreshold, relativeError, sampled, count, true)
+      new PercentileDigest(summary)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8dbf56c0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index b013add..3190e51 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -40,12 +40,14 @@ import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats
  *   See the G-K article for more details.
  * @param count the count of all the elements *inserted in the sampled buffer*
  *              (excluding the head buffer)
+ * @param compressed whether the statistics have been compressed
  */
 class QuantileSummaries(
     val compressThreshold: Int,
     val relativeError: Double,
     val sampled: Array[Stats] = Array.empty,
-    val count: Long = 0L) extends Serializable {
+    val count: Long = 0L,
+    var compressed: Boolean = false) extends Serializable {
 
   // a buffer of latest samples seen so far
   private val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty
@@ -60,6 +62,7 @@ class QuantileSummaries(
    */
   def insert(x: Double): QuantileSummaries = {
     headSampled += x
+    compressed = false
     if (headSampled.size >= defaultHeadSize) {
       val result = this.withHeadBufferInserted
       if (result.sampled.length >= compressThreshold) {
@@ -135,11 +138,11 @@ class QuantileSummaries(
     assert(inserted.count == count + headSampled.size)
     val compressed =
       compressImmut(inserted.sampled, mergeThreshold = 2 * relativeError * inserted.count)
-    new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count)
+    new QuantileSummaries(compressThreshold, relativeError, compressed, inserted.count, true)
   }
 
   private def shallowCopy: QuantileSummaries = {
-    new QuantileSummaries(compressThreshold, relativeError, sampled, count)
+    new QuantileSummaries(compressThreshold, relativeError, sampled, count, compressed)
   }
 
   /**
@@ -163,7 +166,7 @@ class QuantileSummaries(
       val res = (sampled ++ other.sampled).sortBy(_.value)
       val comp = compressImmut(res, mergeThreshold = 2 * relativeError * count)
       new QuantileSummaries(
-        other.compressThreshold, other.relativeError, comp, other.count + count)
+        other.compressThreshold, other.relativeError, comp, other.count + count, true)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8dbf56c0/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
index 137c5be..d635912 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 
+import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
 import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY
 import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.PercentileDigest
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -279,4 +280,16 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext {
       checkAnswer(query, expected)
     }
   }
+
+  test("SPARK-24013: unneeded compress can cause performance issues with sorted input") {
+    val buffer = new PercentileDigest(1.0D / ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
+    var compressCounts = 0
+    (1 to 10000000).foreach { i =>
+      buffer.add(i)
+      if (buffer.isCompressed) compressCounts += 1
+    }
+    assert(compressCounts > 0)
+    buffer.quantileSummaries
+    assert(buffer.isCompressed)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org